diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 4b777ea..8d36265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -30,6 +30,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -42,6 +48,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; @@ -110,4 +120,34 @@ public RefreshServiceAclsResponse refreshServiceAcls( public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 47a6cf7..99dd4fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService { rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto); rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto); rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto); + rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto); + rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto); + rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto); + rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); + rpc getClusterNodeLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 50e5825..deaf54d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -19,11 +19,17 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; -import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -33,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; @@ -41,13 +48,21 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; + +import com.google.common.collect.ImmutableMap; @Private @Unstable @@ -55,6 +70,8 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private boolean directlyAccessNodeLabelStore = false; + static CommonNodeLabelsManager localNodeLabelsManager = null; protected final static Map ADMIN_USAGE = ImmutableMap.builder() @@ -78,7 +95,30 @@ .put("-help", new UsageInfo("[cmd]", "Displays help for the given command or all commands if none " + "is specified.")) - .build(); + .put("-addToClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "add to cluster node labels ")) + .put("-removeFromClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "remove from cluster node labels")) + .put("-replaceLabelsOnNode", + new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]", + "replace labels on nodes")) + .put("-getNodeToLabels", new UsageInfo("", + "Get node to label mappings")) + .put("-getClusterNodeLabels", + new UsageInfo("", "Get node labels in the cluster")) + .put("-directlyAccessNodeLabelStore", + new UsageInfo("", "Directly access node label store, " + + "with this option, all node label related operations" + + " will not connect RM. Instead, they will" + + " access/modify stored node labels directly." + + " By default, it is false (access via RM)." + + " AND PLEASE NOTE: if you configured" + + " yarn.node-labels.fs-store.uri to a local directory" + + " (instead of NFS or HDFS), this option will only work" + + " when the command run on the machine where RM is running.")) + .build(); public RMAdminCLI() { super(); @@ -201,11 +241,13 @@ private static void printUsage(String cmd, boolean isHAEnabled) { ToolRunner.printGenericCommandUsage(System.err); } - - protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { + + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); + return ClientRMProxy.createRMProxy(conf, + ResourceManagerAdministrationProtocol.class); } private int refreshQueues() throws IOException, YarnException { @@ -285,8 +327,190 @@ private int getGroups(String[] usernames) throws IOException { return 0; } + // Make it protected to make unit test can change it. + protected void initNodeLabelManagerInstance() { + if (localNodeLabelsManager == null) { + localNodeLabelsManager = new CommonNodeLabelsManager(); + localNodeLabelsManager.init(getConf()); + localNodeLabelsManager.start(); + } + } + + private int addToClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + return addToClusterNodeLabels(labels); + } + + private int addToClusterNodeLabels(Set labels) throws IOException, + YarnException { + if (directlyAccessNodeLabelStore) { + initNodeLabelManagerInstance(); + localNodeLabelsManager.addToCluserNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + AddToClusterNodeLabelsRequest request = + AddToClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.addToClusterNodeLabels(request); + } + return 0; + } + + private int removeFromClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + if (directlyAccessNodeLabelStore) { + initNodeLabelManagerInstance(); + localNodeLabelsManager.removeFromClusterNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + RemoveFromClusterNodeLabelsRequest request = + RemoveFromClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.removeFromClusterNodeLabels(request); + } + + return 0; + } + + private int getNodeToLabels() throws IOException, YarnException { + Map> nodeToLabels = null; + + if (directlyAccessNodeLabelStore) { + initNodeLabelManagerInstance(); + nodeToLabels = localNodeLabelsManager.getNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + + nodeToLabels = + adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance()) + .getNodeToLabels(); + } + for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) { + System.out.println(String.format("Host=%s, Node-labels=[%s]", + (host.getPort() == 0 ? host.getHost() : host.toString()), + StringUtils.join(sortStrSet(nodeToLabels.get(host)), ","))); + } + return 0; + } + + private int getClusterNodeLabels() throws IOException, YarnException { + Set labels = null; + if (directlyAccessNodeLabelStore) { + initNodeLabelManagerInstance(); + labels = localNodeLabelsManager.getClusterNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProto = createAdminProtocol(); + labels = + adminProto.getClusterNodeLabels( + GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); + } + + System.out.println(String.format("Node-labels=%s", + StringUtils.join(sortStrSet(labels).iterator(), ","))); + return 0; + } + + private List sortNodeIdSet(Set nodes) { + List list = new ArrayList(); + list.addAll(nodes); + Collections.sort(list); + return list; + } + + private List sortStrSet(Set labels) { + List list = new ArrayList(); + list.addAll(labels); + Collections.sort(list); + return list; + } + + private Map> buildNodeLabelsFromStr(String args) + throws IOException { + Map> map = new HashMap>(); + + for (String nodeToLabels : args.split("[ \n]")) { + nodeToLabels = nodeToLabels.trim(); + if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) { + continue; + } + + String[] splits = nodeToLabels.split(","); + String nodeIdStr = splits[0]; + + if (nodeIdStr.trim().isEmpty()) { + throw new IOException("node name cannot be empty"); + } + + String nodeName; + int port; + if (nodeIdStr.contains(":")) { + nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":")); + port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":"))); + } else { + nodeName = nodeIdStr; + port = 0; + } + + NodeId nodeId = NodeId.newInstance(nodeName, port); + + map.put(nodeId, new HashSet()); + + for (int i = 1; i < splits.length; i++) { + if (!splits[i].trim().isEmpty()) { + map.get(nodeId).add(splits[i].trim().toLowerCase()); + } + } + } + + return map; + } + + private int replaceLabelsOnNodes(String args) throws IOException, + YarnException { + Map> map = buildNodeLabelsFromStr(args); + return replaceLabelsOnNodes(map); + } + + private int replaceLabelsOnNodes(Map> map) + throws IOException, YarnException { + if (directlyAccessNodeLabelStore) { + initNodeLabelManagerInstance(); + localNodeLabelsManager.replaceLabelsOnNode(map); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + ReplaceLabelsOnNodeRequest request = + ReplaceLabelsOnNodeRequest.newInstance(map); + adminProtocol.replaceLabelsOnNode(request); + } + return 0; + } + @Override public int run(String[] args) throws Exception { + // -directlyAccessNodeLabelStore is a additional option for node label + // access, so just search if we have specified this option, and remove it + List argsList = new ArrayList(); + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-directlyAccessNodeLabelStore")) { + directlyAccessNodeLabelStore = true; + } else { + argsList.add(args[i]); + } + } + args = argsList.toArray(new String[0]); + YarnConfiguration yarnConf = getConf() == null ? new YarnConfiguration() : new YarnConfiguration( getConf()); @@ -351,6 +575,31 @@ public int run(String[] args) throws Exception { } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); + } else if ("-addToClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = addToClusterNodeLabels(args[i]); + } + } else if ("-removeFromClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = removeFromClusterNodeLabels(args[i]); + } + } else if ("-replaceLabelsOnNode".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i]); + } + } else if ("-getNodeToLabels".equals(cmd)) { + exitCode = getNodeToLabels(); + } else if ("-getClusterNodeLabels".equals(cmd)) { + exitCode = getClusterNodeLabels(); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); @@ -380,6 +629,9 @@ public int run(String[] args) throws Exception { System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage()); } + if (null != localNodeLabelsManager) { + localNodeLabelsManager.stop(); + } return exitCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index 419b9ae..c11c08d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -16,18 +16,17 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.cli; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,9 +36,16 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.nodelabels.DummyCommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; @@ -49,6 +55,10 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.ImmutableSet; public class TestRMAdminCLI { @@ -56,10 +66,24 @@ private HAServiceProtocol haadmin; private RMAdminCLI rmAdminCLI; private RMAdminCLI rmAdminCLIWithHAEnabled; + private CommonNodeLabelsManager dummyNodeLabelsManager; + private boolean remoteAdminServiceAccessed = false; @Before - public void configure() throws IOException { + public void configure() throws IOException, YarnException { + remoteAdminServiceAccessed = false; + dummyNodeLabelsManager = new DummyCommonNodeLabelsManager(); admin = mock(ResourceManagerAdministrationProtocol.class); + when(admin.addToClusterNodeLabels(any(AddToClusterNodeLabelsRequest.class))) + .thenAnswer(new Answer() { + + @Override + public AddToClusterNodeLabelsResponse answer( + InvocationOnMock invocation) throws Throwable { + remoteAdminServiceAccessed = true; + return AddToClusterNodeLabelsResponse.newInstance(); + } + }); haadmin = mock(HAServiceProtocol.class); when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus( @@ -80,6 +104,12 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol() protected HAServiceTarget resolveTarget(String rmId) { return haServiceTarget; } + + @SuppressWarnings("static-access") + @Override + protected void initNodeLabelManagerInstance() { + rmAdminCLI.localNodeLabelsManager = dummyNodeLabelsManager; + } }; YarnConfiguration conf = new YarnConfiguration(); @@ -360,6 +390,129 @@ public void testException() throws Exception { System.setErr(oldErrPrintStream); } } + + @Test + public void testAccessLocalNodeLabelManager() throws Exception { + assertFalse(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + + String[] args = + { "-addToClusterNodeLabels", "x,y", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // reset localNodeLabelsManager + dummyNodeLabelsManager.removeFromClusterNodeLabels(ImmutableSet.of("x", "y")); + + // change the sequence of "-directlyAccessNodeLabelStore" and labels, + // should not matter + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // local node labels manager will be close after running + assertTrue(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + } + + @Test + public void testAccessRemoteNodeLabelManager() throws Exception { + String[] args = + { "-addToClusterNodeLabels", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + + // localNodeLabelsManager shouldn't accessed + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // remote node labels manager accessed + assertTrue(remoteAdminServiceAccessed); + } + + @Test + public void testAddToClusterNodeLabels() throws Exception { + // successfully add labels + String[] args = + { "-addToClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x"))); + + // no labels, should fail + args = new String[] { "-addToClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testRemoveFromClusterNodeLabels() throws Exception { + // Successfully remove labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x")); + String[] args = + { "-removeFromClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // no labels, should fail + args = new String[] { "-removeFromClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-removeFromClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testReplaceLabelsOnNode() throws Exception { + // Successfully replace labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + String[] args = + { "-replaceLabelsOnNode", "node1,x,y node2,y", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node1", 0))); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node2", 0))); + + // no labels, should fail + args = new String[] { "-replaceLabelsOnNode" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-replaceLabelsOnNode", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testGetClusterNodeLabels() throws Exception { + // Successfully get labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + String[] args = + { "-getClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } + + @Test + public void testGetNodeToLabels() throws Exception { + // Successfully get node-to-labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + String[] args = + { "-getNodeToLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } private void testError(String[] args, String template, ByteArrayOutputStream data, int resultCode) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 511b5ee..12a0864 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -236,7 +236,11 @@ protected void stopDispatcher() { protected void serviceStop() throws Exception { // finalize store stopDispatcher(); - store.close(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } } /** @@ -299,14 +303,14 @@ protected void internalAddLabelsToNode( for (Entry> entry : addedLabelsToNode.entrySet()) { NodeId nodeId = entry.getKey(); Set labels = entry.getValue(); - - createNodeIfNonExisted(entry.getKey()); - + + createHostIfNonExisted(nodeId.getHost()); if (nodeId.getPort() == WILDCARD_PORT) { Host host = nodeCollections.get(nodeId.getHost()); host.labels.addAll(labels); newNMToLabels.put(nodeId, host.labels); } else { + createNodeIfNonExisted(nodeId); Node nm = getNMInNodeSet(nodeId); if (nm.labels == null) { nm.labels = new HashSet(); @@ -534,21 +538,21 @@ protected void checkReplaceLabelsOnNode( @SuppressWarnings("unchecked") protected void internalReplaceLabelsOnNode( - Map> replaceLabelsToNode) { + Map> replaceLabelsToNode) throws IOException { // do replace labels to nodes Map> newNMToLabels = new HashMap>(); for (Entry> entry : replaceLabelsToNode.entrySet()) { NodeId nodeId = entry.getKey(); Set labels = entry.getValue(); - // update nodeCollections - createNodeIfNonExisted(entry.getKey()); + createHostIfNonExisted(nodeId.getHost()); if (nodeId.getPort() == WILDCARD_PORT) { Host host = nodeCollections.get(nodeId.getHost()); host.labels.clear(); host.labels.addAll(labels); newNMToLabels.put(nodeId, host.labels); } else { + createNodeIfNonExisted(nodeId); Node nm = getNMInNodeSet(nodeId); if (nm.labels == null) { nm.labels = new HashSet(); @@ -672,10 +676,6 @@ protected Node getNMInNodeSet(NodeId nodeId, Map map) { protected Node getNMInNodeSet(NodeId nodeId, Map map, boolean checkRunning) { - if (WILDCARD_PORT == nodeId.getPort()) { - return null; - } - Host host = map.get(nodeId.getHost()); if (null == host) { return null; @@ -707,17 +707,22 @@ protected Node getNMInNodeSet(NodeId nodeId, Map map, } } - protected void createNodeIfNonExisted(NodeId nodeId) { + protected void createNodeIfNonExisted(NodeId nodeId) throws IOException { Host host = nodeCollections.get(nodeId.getHost()); if (null == host) { - host = new Host(); - nodeCollections.put(nodeId.getHost(), host); + throw new IOException("Should create host before creating node."); } - if (nodeId.getPort() != WILDCARD_PORT) { - Node nm = host.nms.get(nodeId); - if (null == nm) { - host.nms.put(nodeId, new Node()); - } + Node nm = host.nms.get(nodeId); + if (null == nm) { + host.nms.put(nodeId, new Node()); + } + } + + protected void createHostIfNonExisted(String hostName) { + Host host = nodeCollections.get(hostName); + if (null == host) { + host = new Host(); + nodeCollections.put(hostName, host); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index ccffaed..e4828e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -29,17 +29,28 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -52,8 +63,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -66,6 +87,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -205,5 +230,75 @@ public UpdateNodeResourceResponse updateNodeResource( return null; } } - + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + AddToClusterNodeLabelsRequestProto requestProto = + ((AddToClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new AddToClusterNodeLabelsResponsePBImpl( + proxy.addToClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, + IOException { + RemoveFromClusterNodeLabelsRequestProto requestProto = + ((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new RemoveFromClusterNodeLabelsResponsePBImpl( + proxy.removeFromClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + ReplaceLabelsOnNodeRequestProto requestProto = + ((ReplaceLabelsOnNodeRequestPBImpl) request).getProto(); + try { + return new ReplaceLabelsOnNodeResponsePBImpl(proxy.replaceLabelsOnNodes( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + GetNodesToLabelsRequestProto requestProto = + ((GetNodesToLabelsRequestPBImpl) request).getProto(); + try { + return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + GetClusterNodeLabelsRequestProto requestProto = + ((GetClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new GetClusterNodeLabelsResponsePBImpl(proxy.getClusterNodeLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index d1f71fe..7cfecad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -22,8 +22,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; @@ -36,17 +42,32 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -59,6 +80,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -204,4 +229,86 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll } } + @Override + public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels( + RpcController controller, AddToClusterNodeLabelsRequestProto proto) + throws ServiceException { + AddToClusterNodeLabelsRequestPBImpl request = + new AddToClusterNodeLabelsRequestPBImpl(proto); + try { + AddToClusterNodeLabelsResponse response = + real.addToClusterNodeLabels(request); + return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponseProto removeFromClusterNodeLabels( + RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto) + throws ServiceException { + RemoveFromClusterNodeLabelsRequestPBImpl request = + new RemoveFromClusterNodeLabelsRequestPBImpl(proto); + try { + RemoveFromClusterNodeLabelsResponse response = + real.removeFromClusterNodeLabels(request); + return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReplaceLabelsOnNodeResponseProto replaceLabelsOnNodes( + RpcController controller, ReplaceLabelsOnNodeRequestProto proto) + throws ServiceException { + ReplaceLabelsOnNodeRequestPBImpl request = + new ReplaceLabelsOnNodeRequestPBImpl(proto); + try { + ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request); + return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetNodesToLabelsResponseProto getNodeToLabels( + RpcController controller, GetNodesToLabelsRequestProto proto) + throws ServiceException { + GetNodesToLabelsRequestPBImpl request = + new GetNodesToLabelsRequestPBImpl(proto); + try { + GetNodesToLabelsResponse response = real.getNodeToLabels(request); + return ((GetNodesToLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetClusterNodeLabelsResponseProto getClusterNodeLabels( + RpcController controller, GetClusterNodeLabelsRequestProto proto) + throws ServiceException { + GetClusterNodeLabelsRequestPBImpl request = + new GetClusterNodeLabelsRequestPBImpl(proto); + try { + GetClusterNodeLabelsResponse response = + real.getClusterNodeLabels(request); + return ((GetClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java index fcdf969..65ea79f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -78,4 +78,9 @@ protected void startDispatcher() { protected void stopDispatcher() { // do nothing } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 2b7797f..ecdcb20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -57,6 +57,12 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -69,8 +75,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -618,4 +630,104 @@ public AccessControlList getAccessControlList() { public Server getServer() { return this.server; } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException { + String argName = "addToClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not add labels."); + throwStandbyException(); + } + + AddToClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception add labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception add label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { + String argName = "removeFromClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not remove labels."); + throwStandbyException(); + } + + RemoveFromClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception remove labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception remove label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + String argName = "replaceLabelsOnNode"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not set node to labels."); + throwStandbyException(); + } + + ReplaceLabelsOnNodeResponse response = + recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + try { + rmContext.getNodeLabelManager().replaceLabelsOnNode( + request.getNodeToLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception set node to labels. ", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "Exception set node to labels."); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + return GetNodesToLabelsResponsePBImpl.newInstance(rmContext + .getNodeLabelManager().getNodeLabels()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request) + throws YarnException, IOException { + return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager() + .getClusterNodeLabels()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 1d7f6f1..ba1727c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -181,7 +181,15 @@ public void activateNode(NodeId nodeId, Resource resource) { // save if we have a node before Map before = cloneNodeMap(ImmutableSet.of(nodeId)); - createNodeIfNonExisted(nodeId); + createHostIfNonExisted(nodeId.getHost()); + try { + createNodeIfNonExisted(nodeId); + } catch (IOException e) { + LOG.error("This shouldn't happen, cannot get host in nodeCollection" + + " associated to the node being activated"); + return; + } + Node nm = getNMInNodeSet(nodeId); nm.resource = resource; nm.running = true; @@ -220,7 +228,7 @@ public void deactivateNode(NodeId nodeId) { } } - public void updateNodeResource(NodeId node, Resource newResource) { + public void updateNodeResource(NodeId node, Resource newResource) throws IOException { deactivateNode(node); activateNode(node, newResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java index 81eead9..1fbe968 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -96,6 +96,14 @@ public void testNodeActiveDeactiveUpdate() throws Exception { Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), Resources.add(SMALL_RESOURCE, LARGE_NODE)); } + + @Test(timeout = 5000) + public void testActivateNodeManagerWithZeroPort() throws Exception { + // active two NM, one is zero port , another is non-zero port. no exception + // should be raised + mgr.activateNode(NodeId.newInstance("n1", 0), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); + } @SuppressWarnings({ "unchecked", "rawtypes" }) @Test(timeout = 5000)