diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 4b777ea..8d36265 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -30,6 +30,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -42,6 +48,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; @@ -110,4 +120,34 @@ public RefreshServiceAclsResponse refreshServiceAcls( public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 47a6cf7..99dd4fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService { rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto); rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto); rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto); + rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto); + rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto); + rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto); + rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); + rpc getClusterNodeLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 50e5825..5e3f2a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -19,11 +19,17 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; -import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -33,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; @@ -41,13 +48,21 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; + +import com.google.common.collect.ImmutableMap; @Private @Unstable @@ -55,6 +70,8 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private boolean directlyAccessNodeLabelStore = false; + static CommonNodeLabelsManager localNodeLabelsManager = null; protected final static Map ADMIN_USAGE = ImmutableMap.builder() @@ -78,7 +95,30 @@ .put("-help", new UsageInfo("[cmd]", "Displays help for the given command or all commands if none " + "is specified.")) - .build(); + .put("-addToClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "add to cluster node labels ")) + .put("-removeFromClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "remove from cluster node labels")) + .put("-replaceLabelsOnNode", + new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]", + "replace labels on nodes")) + .put("-getNodeToLabels", new UsageInfo("", + "Get node to label mappings")) + .put("-getClusterNodeLabels", + new UsageInfo("", "Get node labels in the cluster")) + .put("-directlyAccessNodeLabelStore", + new UsageInfo("", "Directly access node label store, " + + "with this option, all node label related operations" + + " will not connect RM. Instead, they will" + + " access/modify stored node labels directly." + + " By default, it is false (access via RM)." + + " AND PLEASE NOTE: if you configured" + + " yarn.node-labels.fs-store.uri to a local directory" + + " (instead of NFS or HDFS), this option will only work" + + " when the command run on the machine where RM is running.")) + .build(); public RMAdminCLI() { super(); @@ -201,11 +241,13 @@ private static void printUsage(String cmd, boolean isHAEnabled) { ToolRunner.printGenericCommandUsage(System.err); } - - protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { + + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); + return ClientRMProxy.createRMProxy(conf, + ResourceManagerAdministrationProtocol.class); } private int refreshQueues() throws IOException, YarnException { @@ -285,8 +327,187 @@ private int getGroups(String[] usernames) throws IOException { return 0; } + // Make it protected to make unit test can change it. + protected static synchronized CommonNodeLabelsManager + getNodeLabelManagerInstance(Configuration conf) { + if (localNodeLabelsManager == null) { + localNodeLabelsManager = new CommonNodeLabelsManager(); + localNodeLabelsManager.init(conf); + localNodeLabelsManager.start(); + } + return localNodeLabelsManager; + } + + private int addToClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + return addToClusterNodeLabels(labels); + } + + private int addToClusterNodeLabels(Set labels) throws IOException, + YarnException { + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + AddToClusterNodeLabelsRequest request = + AddToClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.addToClusterNodeLabels(request); + } + return 0; + } + + private int removeFromClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).removeFromClusterNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + RemoveFromClusterNodeLabelsRequest request = + RemoveFromClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.removeFromClusterNodeLabels(request); + } + + return 0; + } + + private int getNodeToLabels() throws IOException, YarnException { + Map> nodeToLabels = null; + + if (directlyAccessNodeLabelStore) { + nodeToLabels = getNodeLabelManagerInstance(getConf()).getNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + + nodeToLabels = + adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance()) + .getNodeToLabels(); + } + for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) { + System.out.println(String.format("Host=%s, Node-labels=[%s]", + (host.getPort() == 0 ? host.getHost() : host.toString()), + StringUtils.join(sortStrSet(nodeToLabels.get(host)), ","))); + } + return 0; + } + + private int getClusterNodeLabels() throws IOException, YarnException { + Set labels = null; + if (directlyAccessNodeLabelStore) { + labels = getNodeLabelManagerInstance(getConf()).getClusterNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProto = createAdminProtocol(); + labels = + adminProto.getClusterNodeLabels( + GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); + } + + System.out.println(String.format("Node-labels=%s", + StringUtils.join(sortStrSet(labels).iterator(), ","))); + return 0; + } + + private List sortNodeIdSet(Set nodes) { + List list = new ArrayList(); + list.addAll(nodes); + Collections.sort(list); + return list; + } + + private List sortStrSet(Set labels) { + List list = new ArrayList(); + list.addAll(labels); + Collections.sort(list); + return list; + } + + private Map> buildNodeLabelsFromStr(String args) + throws IOException { + Map> map = new HashMap>(); + + for (String nodeToLabels : args.split("[ \n]")) { + nodeToLabels = nodeToLabels.trim(); + if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) { + continue; + } + + String[] splits = nodeToLabels.split(","); + String nodeIdStr = splits[0]; + + if (nodeIdStr.trim().isEmpty()) { + throw new IOException("node name cannot be empty"); + } + + String nodeName; + int port; + if (nodeIdStr.contains(":")) { + nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":")); + port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":"))); + } else { + nodeName = nodeIdStr; + port = 0; + } + + NodeId nodeId = NodeId.newInstance(nodeName, port); + + map.put(nodeId, new HashSet()); + + for (int i = 1; i < splits.length; i++) { + if (!splits[i].trim().isEmpty()) { + map.get(nodeId).add(splits[i].trim().toLowerCase()); + } + } + } + + return map; + } + + private int replaceLabelsOnNodes(String args) throws IOException, + YarnException { + Map> map = buildNodeLabelsFromStr(args); + return replaceLabelsOnNodes(map); + } + + private int replaceLabelsOnNodes(Map> map) + throws IOException, YarnException { + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + ReplaceLabelsOnNodeRequest request = + ReplaceLabelsOnNodeRequest.newInstance(map); + adminProtocol.replaceLabelsOnNode(request); + } + return 0; + } + @Override public int run(String[] args) throws Exception { + // -directlyAccessNodeLabelStore is a additional option for node label + // access, so just search if we have specified this option, and remove it + List argsList = new ArrayList(); + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-directlyAccessNodeLabelStore")) { + directlyAccessNodeLabelStore = true; + } else { + argsList.add(args[i]); + } + } + args = argsList.toArray(new String[0]); + YarnConfiguration yarnConf = getConf() == null ? new YarnConfiguration() : new YarnConfiguration( getConf()); @@ -351,6 +572,31 @@ public int run(String[] args) throws Exception { } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); + } else if ("-addToClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = addToClusterNodeLabels(args[i]); + } + } else if ("-removeFromClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = removeFromClusterNodeLabels(args[i]); + } + } else if ("-replaceLabelsOnNode".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i]); + } + } else if ("-getNodeToLabels".equals(cmd)) { + exitCode = getNodeToLabels(); + } else if ("-getClusterNodeLabels".equals(cmd)) { + exitCode = getClusterNodeLabels(); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); @@ -380,6 +626,9 @@ public int run(String[] args) throws Exception { System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage()); } + if (null != localNodeLabelsManager) { + localNodeLabelsManager.stop(); + } return exitCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java deleted file mode 100644 index 419b9ae..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java +++ /dev/null @@ -1,371 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; -import org.apache.hadoop.ha.HAServiceTarget; -import org.apache.hadoop.yarn.client.cli.RMAdminCLI; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentMatcher; - -public class TestRMAdminCLI { - - private ResourceManagerAdministrationProtocol admin; - private HAServiceProtocol haadmin; - private RMAdminCLI rmAdminCLI; - private RMAdminCLI rmAdminCLIWithHAEnabled; - - @Before - public void configure() throws IOException { - admin = mock(ResourceManagerAdministrationProtocol.class); - - haadmin = mock(HAServiceProtocol.class); - when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus( - HAServiceProtocol.HAServiceState.INITIALIZING)); - - final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class); - when(haServiceTarget.getProxy(any(Configuration.class), anyInt())) - .thenReturn(haadmin); - rmAdminCLI = new RMAdminCLI(new Configuration()) { - - @Override - protected ResourceManagerAdministrationProtocol createAdminProtocol() - throws IOException { - return admin; - } - - @Override - protected HAServiceTarget resolveTarget(String rmId) { - return haServiceTarget; - } - }; - - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - rmAdminCLIWithHAEnabled = new RMAdminCLI(conf) { - - @Override - protected ResourceManagerAdministrationProtocol createAdminProtocol() - throws IOException { - return admin; - } - - @Override - protected HAServiceTarget resolveTarget(String rmId) { - return haServiceTarget; - } - }; - } - - @Test(timeout=500) - public void testRefreshQueues() throws Exception { - String[] args = { "-refreshQueues" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); - } - - @Test(timeout=500) - public void testRefreshUserToGroupsMappings() throws Exception { - String[] args = { "-refreshUserToGroupsMappings" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshUserToGroupsMappings( - any(RefreshUserToGroupsMappingsRequest.class)); - } - - @Test(timeout=500) - public void testRefreshSuperUserGroupsConfiguration() throws Exception { - String[] args = { "-refreshSuperUserGroupsConfiguration" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshSuperUserGroupsConfiguration( - any(RefreshSuperUserGroupsConfigurationRequest.class)); - } - - @Test(timeout=500) - public void testRefreshAdminAcls() throws Exception { - String[] args = { "-refreshAdminAcls" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class)); - } - - @Test(timeout=500) - public void testRefreshServiceAcl() throws Exception { - String[] args = { "-refreshServiceAcl" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshServiceAcls(any(RefreshServiceAclsRequest.class)); - } - - @Test(timeout=500) - public void testRefreshNodes() throws Exception { - String[] args = { "-refreshNodes" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).refreshNodes(any(RefreshNodesRequest.class)); - } - - @Test(timeout=500) - public void testGetGroups() throws Exception { - when(admin.getGroupsForUser(eq("admin"))).thenReturn( - new String[] {"group1", "group2"}); - PrintStream origOut = System.out; - PrintStream out = mock(PrintStream.class); - System.setOut(out); - try { - String[] args = { "-getGroups", "admin" }; - assertEquals(0, rmAdminCLI.run(args)); - verify(admin).getGroupsForUser(eq("admin")); - verify(out).println(argThat(new ArgumentMatcher() { - @Override - public boolean matches(Object argument) { - return ("" + argument).equals("admin : group1 group2"); - } - })); - } finally { - System.setOut(origOut); - } - } - - @Test(timeout = 500) - public void testTransitionToActive() throws Exception { - String[] args = {"-transitionToActive", "rm1"}; - - // RM HA is disabled. - // transitionToActive should not be executed - assertEquals(-1, rmAdminCLI.run(args)); - verify(haadmin, never()).transitionToActive( - any(HAServiceProtocol.StateChangeRequestInfo.class)); - - // Now RM HA is enabled. - // transitionToActive should be executed - assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); - verify(haadmin).transitionToActive( - any(HAServiceProtocol.StateChangeRequestInfo.class)); - } - - @Test(timeout = 500) - public void testTransitionToStandby() throws Exception { - String[] args = {"-transitionToStandby", "rm1"}; - - // RM HA is disabled. - // transitionToStandby should not be executed - assertEquals(-1, rmAdminCLI.run(args)); - verify(haadmin, never()).transitionToStandby( - any(HAServiceProtocol.StateChangeRequestInfo.class)); - - // Now RM HA is enabled. - // transitionToActive should be executed - assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); - verify(haadmin).transitionToStandby( - any(HAServiceProtocol.StateChangeRequestInfo.class)); - } - - @Test(timeout = 500) - public void testGetServiceState() throws Exception { - String[] args = {"-getServiceState", "rm1"}; - - // RM HA is disabled. - // getServiceState should not be executed - assertEquals(-1, rmAdminCLI.run(args)); - verify(haadmin, never()).getServiceStatus(); - - // Now RM HA is enabled. - // getServiceState should be executed - assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); - verify(haadmin).getServiceStatus(); - } - - @Test(timeout = 500) - public void testCheckHealth() throws Exception { - String[] args = {"-checkHealth", "rm1"}; - - // RM HA is disabled. - // getServiceState should not be executed - assertEquals(-1, rmAdminCLI.run(args)); - verify(haadmin, never()).monitorHealth(); - - // Now RM HA is enabled. - // getServiceState should be executed - assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); - verify(haadmin).monitorHealth(); - } - - /** - * Test printing of help messages - */ - @Test(timeout=500) - public void testHelp() throws Exception { - PrintStream oldOutPrintStream = System.out; - PrintStream oldErrPrintStream = System.err; - ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); - ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); - System.setOut(new PrintStream(dataOut)); - System.setErr(new PrintStream(dataErr)); - try { - String[] args = { "-help" }; - assertEquals(0, rmAdminCLI.run(args)); - oldOutPrintStream.println(dataOut); - assertTrue(dataOut - .toString() - .contains( - "rmadmin is the command to execute YARN administrative commands.")); - assertTrue(dataOut - .toString() - .contains( - "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + - "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + - "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + - " [username]] [-help [cmd]]")); - assertTrue(dataOut - .toString() - .contains( - "-refreshQueues: Reload the queues' acls, states and scheduler " + - "specific properties.")); - assertTrue(dataOut - .toString() - .contains( - "-refreshNodes: Refresh the hosts information at the " + - "ResourceManager.")); - assertTrue(dataOut.toString().contains( - "-refreshUserToGroupsMappings: Refresh user-to-groups mappings")); - assertTrue(dataOut - .toString() - .contains( - "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy" + - " groups mappings")); - assertTrue(dataOut - .toString() - .contains( - "-refreshAdminAcls: Refresh acls for administration of " + - "ResourceManager")); - assertTrue(dataOut - .toString() - .contains( - "-refreshServiceAcl: Reload the service-level authorization" + - " policy file")); - assertTrue(dataOut - .toString() - .contains( - "-help [cmd]: Displays help for the given command or all " + - "commands if none")); - - testError(new String[] { "-help", "-refreshQueues" }, - "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0); - testError(new String[] { "-help", "-refreshNodes" }, - "Usage: yarn rmadmin [-refreshNodes]", dataErr, 0); - testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, - "Usage: yarn rmadmin [-refreshUserToGroupsMappings]", dataErr, 0); - testError( - new String[] { "-help", "-refreshSuperUserGroupsConfiguration" }, - "Usage: yarn rmadmin [-refreshSuperUserGroupsConfiguration]", - dataErr, 0); - testError(new String[] { "-help", "-refreshAdminAcls" }, - "Usage: yarn rmadmin [-refreshAdminAcls]", dataErr, 0); - testError(new String[] { "-help", "-refreshServiceAcl" }, - "Usage: yarn rmadmin [-refreshServiceAcl]", dataErr, 0); - testError(new String[] { "-help", "-getGroups" }, - "Usage: yarn rmadmin [-getGroups [username]]", dataErr, 0); - testError(new String[] { "-help", "-transitionToActive" }, - "Usage: yarn rmadmin [-transitionToActive " + - " [--forceactive]]", dataErr, 0); - testError(new String[] { "-help", "-transitionToStandby" }, - "Usage: yarn rmadmin [-transitionToStandby ]", dataErr, 0); - testError(new String[] { "-help", "-getServiceState" }, - "Usage: yarn rmadmin [-getServiceState ]", dataErr, 0); - testError(new String[] { "-help", "-checkHealth" }, - "Usage: yarn rmadmin [-checkHealth ]", dataErr, 0); - testError(new String[] { "-help", "-failover" }, - "Usage: yarn rmadmin " + - "[-failover [--forcefence] [--forceactive] " + - " ]", - dataErr, 0); - - testError(new String[] { "-help", "-badParameter" }, - "Usage: yarn rmadmin", dataErr, 0); - testError(new String[] { "-badParameter" }, - "badParameter: Unknown command", dataErr, -1); - - // Test -help when RM HA is enabled - assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); - oldOutPrintStream.println(dataOut); - assertTrue(dataOut - .toString() - .contains( - "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + - "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + - "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + - " [username]] [-help [cmd]] [-transitionToActive " + - " [--forceactive]] [-transitionToStandby ] [-failover" + - " [--forcefence] [--forceactive] ] " + - "[-getServiceState ] [-checkHealth ]")); - } finally { - System.setOut(oldOutPrintStream); - System.setErr(oldErrPrintStream); - } - } - - @Test(timeout=500) - public void testException() throws Exception { - PrintStream oldErrPrintStream = System.err; - ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); - System.setErr(new PrintStream(dataErr)); - try { - when(admin.refreshQueues(any(RefreshQueuesRequest.class))) - .thenThrow(new IOException("test exception")); - String[] args = { "-refreshQueues" }; - - assertEquals(-1, rmAdminCLI.run(args)); - verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); - assertTrue(dataErr.toString().contains("refreshQueues: test exception")); - } finally { - System.setErr(oldErrPrintStream); - } - } - - private void testError(String[] args, String template, - ByteArrayOutputStream data, int resultCode) throws Exception { - assertEquals(resultCode, rmAdminCLI.run(args)); - assertTrue(data.toString().contains(template)); - data.reset(); - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java new file mode 100644 index 0000000..b4d29b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -0,0 +1,517 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.cli; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.cli.RMAdminCLI; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.nodelabels.DummyCommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.ImmutableSet; + +public class TestRMAdminCLI { + + private ResourceManagerAdministrationProtocol admin; + private HAServiceProtocol haadmin; + private RMAdminCLI rmAdminCLI; + private RMAdminCLI rmAdminCLIWithHAEnabled; + private CommonNodeLabelsManager dummyNodeLabelsManager; + private boolean remoteAdminServiceAccessed = false; + + @SuppressWarnings("static-access") + @Before + public void configure() throws IOException, YarnException { + remoteAdminServiceAccessed = false; + dummyNodeLabelsManager = new DummyCommonNodeLabelsManager(); + admin = mock(ResourceManagerAdministrationProtocol.class); + when(admin.addToClusterNodeLabels(any(AddToClusterNodeLabelsRequest.class))) + .thenAnswer(new Answer() { + + @Override + public AddToClusterNodeLabelsResponse answer( + InvocationOnMock invocation) throws Throwable { + remoteAdminServiceAccessed = true; + return AddToClusterNodeLabelsResponse.newInstance(); + } + }); + + haadmin = mock(HAServiceProtocol.class); + when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus( + HAServiceProtocol.HAServiceState.INITIALIZING)); + + final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class); + when(haServiceTarget.getProxy(any(Configuration.class), anyInt())) + .thenReturn(haadmin); + rmAdminCLI = new RMAdminCLI(new Configuration()) { + @Override + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { + return admin; + } + + @Override + protected HAServiceTarget resolveTarget(String rmId) { + return haServiceTarget; + } + }; + rmAdminCLI.localNodeLabelsManager = dummyNodeLabelsManager; + + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + rmAdminCLIWithHAEnabled = new RMAdminCLI(conf) { + + @Override + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { + return admin; + } + + @Override + protected HAServiceTarget resolveTarget(String rmId) { + return haServiceTarget; + } + }; + } + + @Test(timeout=500) + public void testRefreshQueues() throws Exception { + String[] args = { "-refreshQueues" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); + } + + @Test(timeout=500) + public void testRefreshUserToGroupsMappings() throws Exception { + String[] args = { "-refreshUserToGroupsMappings" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshUserToGroupsMappings( + any(RefreshUserToGroupsMappingsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshSuperUserGroupsConfiguration() throws Exception { + String[] args = { "-refreshSuperUserGroupsConfiguration" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshSuperUserGroupsConfiguration( + any(RefreshSuperUserGroupsConfigurationRequest.class)); + } + + @Test(timeout=500) + public void testRefreshAdminAcls() throws Exception { + String[] args = { "-refreshAdminAcls" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshAdminAcls(any(RefreshAdminAclsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshServiceAcl() throws Exception { + String[] args = { "-refreshServiceAcl" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshServiceAcls(any(RefreshServiceAclsRequest.class)); + } + + @Test(timeout=500) + public void testRefreshNodes() throws Exception { + String[] args = { "-refreshNodes" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).refreshNodes(any(RefreshNodesRequest.class)); + } + + @Test(timeout=500) + public void testGetGroups() throws Exception { + when(admin.getGroupsForUser(eq("admin"))).thenReturn( + new String[] {"group1", "group2"}); + PrintStream origOut = System.out; + PrintStream out = mock(PrintStream.class); + System.setOut(out); + try { + String[] args = { "-getGroups", "admin" }; + assertEquals(0, rmAdminCLI.run(args)); + verify(admin).getGroupsForUser(eq("admin")); + verify(out).println(argThat(new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ("" + argument).equals("admin : group1 group2"); + } + })); + } finally { + System.setOut(origOut); + } + } + + @Test(timeout = 500) + public void testTransitionToActive() throws Exception { + String[] args = {"-transitionToActive", "rm1"}; + + // RM HA is disabled. + // transitionToActive should not be executed + assertEquals(-1, rmAdminCLI.run(args)); + verify(haadmin, never()).transitionToActive( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + + // Now RM HA is enabled. + // transitionToActive should be executed + assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); + verify(haadmin).transitionToActive( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + } + + @Test(timeout = 500) + public void testTransitionToStandby() throws Exception { + String[] args = {"-transitionToStandby", "rm1"}; + + // RM HA is disabled. + // transitionToStandby should not be executed + assertEquals(-1, rmAdminCLI.run(args)); + verify(haadmin, never()).transitionToStandby( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + + // Now RM HA is enabled. + // transitionToActive should be executed + assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); + verify(haadmin).transitionToStandby( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + } + + @Test(timeout = 500) + public void testGetServiceState() throws Exception { + String[] args = {"-getServiceState", "rm1"}; + + // RM HA is disabled. + // getServiceState should not be executed + assertEquals(-1, rmAdminCLI.run(args)); + verify(haadmin, never()).getServiceStatus(); + + // Now RM HA is enabled. + // getServiceState should be executed + assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); + verify(haadmin).getServiceStatus(); + } + + @Test(timeout = 500) + public void testCheckHealth() throws Exception { + String[] args = {"-checkHealth", "rm1"}; + + // RM HA is disabled. + // getServiceState should not be executed + assertEquals(-1, rmAdminCLI.run(args)); + verify(haadmin, never()).monitorHealth(); + + // Now RM HA is enabled. + // getServiceState should be executed + assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); + verify(haadmin).monitorHealth(); + } + + /** + * Test printing of help messages + */ + @Test(timeout=500) + public void testHelp() throws Exception { + PrintStream oldOutPrintStream = System.out; + PrintStream oldErrPrintStream = System.err; + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); + System.setOut(new PrintStream(dataOut)); + System.setErr(new PrintStream(dataErr)); + try { + String[] args = { "-help" }; + assertEquals(0, rmAdminCLI.run(args)); + oldOutPrintStream.println(dataOut); + assertTrue(dataOut + .toString() + .contains( + "rmadmin is the command to execute YARN administrative commands.")); + assertTrue(dataOut + .toString() + .contains( + "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + + " [username]] [-help [cmd]]")); + assertTrue(dataOut + .toString() + .contains( + "-refreshQueues: Reload the queues' acls, states and scheduler " + + "specific properties.")); + assertTrue(dataOut + .toString() + .contains( + "-refreshNodes: Refresh the hosts information at the " + + "ResourceManager.")); + assertTrue(dataOut.toString().contains( + "-refreshUserToGroupsMappings: Refresh user-to-groups mappings")); + assertTrue(dataOut + .toString() + .contains( + "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy" + + " groups mappings")); + assertTrue(dataOut + .toString() + .contains( + "-refreshAdminAcls: Refresh acls for administration of " + + "ResourceManager")); + assertTrue(dataOut + .toString() + .contains( + "-refreshServiceAcl: Reload the service-level authorization" + + " policy file")); + assertTrue(dataOut + .toString() + .contains( + "-help [cmd]: Displays help for the given command or all " + + "commands if none")); + + testError(new String[] { "-help", "-refreshQueues" }, + "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0); + testError(new String[] { "-help", "-refreshNodes" }, + "Usage: yarn rmadmin [-refreshNodes]", dataErr, 0); + testError(new String[] { "-help", "-refreshUserToGroupsMappings" }, + "Usage: yarn rmadmin [-refreshUserToGroupsMappings]", dataErr, 0); + testError( + new String[] { "-help", "-refreshSuperUserGroupsConfiguration" }, + "Usage: yarn rmadmin [-refreshSuperUserGroupsConfiguration]", + dataErr, 0); + testError(new String[] { "-help", "-refreshAdminAcls" }, + "Usage: yarn rmadmin [-refreshAdminAcls]", dataErr, 0); + testError(new String[] { "-help", "-refreshServiceAcl" }, + "Usage: yarn rmadmin [-refreshServiceAcl]", dataErr, 0); + testError(new String[] { "-help", "-getGroups" }, + "Usage: yarn rmadmin [-getGroups [username]]", dataErr, 0); + testError(new String[] { "-help", "-transitionToActive" }, + "Usage: yarn rmadmin [-transitionToActive " + + " [--forceactive]]", dataErr, 0); + testError(new String[] { "-help", "-transitionToStandby" }, + "Usage: yarn rmadmin [-transitionToStandby ]", dataErr, 0); + testError(new String[] { "-help", "-getServiceState" }, + "Usage: yarn rmadmin [-getServiceState ]", dataErr, 0); + testError(new String[] { "-help", "-checkHealth" }, + "Usage: yarn rmadmin [-checkHealth ]", dataErr, 0); + testError(new String[] { "-help", "-failover" }, + "Usage: yarn rmadmin " + + "[-failover [--forcefence] [--forceactive] " + + " ]", + dataErr, 0); + + testError(new String[] { "-help", "-badParameter" }, + "Usage: yarn rmadmin", dataErr, 0); + testError(new String[] { "-badParameter" }, + "badParameter: Unknown command", dataErr, -1); + + // Test -help when RM HA is enabled + assertEquals(0, rmAdminCLIWithHAEnabled.run(args)); + oldOutPrintStream.println(dataOut); + assertTrue(dataOut + .toString() + .contains( + "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + + " [username]] [-help [cmd]] [-transitionToActive " + + " [--forceactive]] [-transitionToStandby ] [-failover" + + " [--forcefence] [--forceactive] ] " + + "[-getServiceState ] [-checkHealth ]")); + } finally { + System.setOut(oldOutPrintStream); + System.setErr(oldErrPrintStream); + } + } + + @Test(timeout=500) + public void testException() throws Exception { + PrintStream oldErrPrintStream = System.err; + ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); + System.setErr(new PrintStream(dataErr)); + try { + when(admin.refreshQueues(any(RefreshQueuesRequest.class))) + .thenThrow(new IOException("test exception")); + String[] args = { "-refreshQueues" }; + + assertEquals(-1, rmAdminCLI.run(args)); + verify(admin).refreshQueues(any(RefreshQueuesRequest.class)); + assertTrue(dataErr.toString().contains("refreshQueues: test exception")); + } finally { + System.setErr(oldErrPrintStream); + } + } + + @Test + public void testAccessLocalNodeLabelManager() throws Exception { + assertFalse(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + + String[] args = + { "-addToClusterNodeLabels", "x,y", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // reset localNodeLabelsManager + dummyNodeLabelsManager.removeFromClusterNodeLabels(ImmutableSet.of("x", "y")); + + // change the sequence of "-directlyAccessNodeLabelStore" and labels, + // should not matter + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // local node labels manager will be close after running + assertTrue(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + } + + @Test + public void testAccessRemoteNodeLabelManager() throws Exception { + String[] args = + { "-addToClusterNodeLabels", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + + // localNodeLabelsManager shouldn't accessed + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // remote node labels manager accessed + assertTrue(remoteAdminServiceAccessed); + } + + @Test + public void testAddToClusterNodeLabels() throws Exception { + // successfully add labels + String[] args = + { "-addToClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x"))); + + // no labels, should fail + args = new String[] { "-addToClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testRemoveFromClusterNodeLabels() throws Exception { + // Successfully remove labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x")); + String[] args = + { "-removeFromClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // no labels, should fail + args = new String[] { "-removeFromClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-removeFromClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testReplaceLabelsOnNode() throws Exception { + // Successfully replace labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + String[] args = + { "-replaceLabelsOnNode", "node1,x,y node2,y", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node1", 0))); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node2", 0))); + + // no labels, should fail + args = new String[] { "-replaceLabelsOnNode" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-replaceLabelsOnNode", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testGetClusterNodeLabels() throws Exception { + // Successfully get labels + String[] args = + { "-getClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } + + @Test + public void testGetNodeToLabels() throws Exception { + // Successfully get node-to-labels + String[] args = + { "-getNodeToLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } + + private void testError(String[] args, String template, + ByteArrayOutputStream data, int resultCode) throws Exception { + assertEquals(resultCode, rmAdminCLI.run(args)); + assertTrue(data.toString().contains(template)); + data.reset(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index 8bb88f2..12a0864 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -236,7 +236,11 @@ protected void stopDispatcher() { protected void serviceStop() throws Exception { // finalize store stopDispatcher(); - store.close(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index ccffaed..e4828e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -29,17 +29,28 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -52,8 +63,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -66,6 +87,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -205,5 +230,75 @@ public UpdateNodeResourceResponse updateNodeResource( return null; } } - + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + AddToClusterNodeLabelsRequestProto requestProto = + ((AddToClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new AddToClusterNodeLabelsResponsePBImpl( + proxy.addToClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, + IOException { + RemoveFromClusterNodeLabelsRequestProto requestProto = + ((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new RemoveFromClusterNodeLabelsResponsePBImpl( + proxy.removeFromClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + ReplaceLabelsOnNodeRequestProto requestProto = + ((ReplaceLabelsOnNodeRequestPBImpl) request).getProto(); + try { + return new ReplaceLabelsOnNodeResponsePBImpl(proxy.replaceLabelsOnNodes( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + GetNodesToLabelsRequestProto requestProto = + ((GetNodesToLabelsRequestPBImpl) request).getProto(); + try { + return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + GetClusterNodeLabelsRequestProto requestProto = + ((GetClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new GetClusterNodeLabelsResponsePBImpl(proxy.getClusterNodeLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index d1f71fe..7cfecad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -22,8 +22,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; @@ -36,17 +42,32 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -59,6 +80,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -204,4 +229,86 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll } } + @Override + public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels( + RpcController controller, AddToClusterNodeLabelsRequestProto proto) + throws ServiceException { + AddToClusterNodeLabelsRequestPBImpl request = + new AddToClusterNodeLabelsRequestPBImpl(proto); + try { + AddToClusterNodeLabelsResponse response = + real.addToClusterNodeLabels(request); + return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponseProto removeFromClusterNodeLabels( + RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto) + throws ServiceException { + RemoveFromClusterNodeLabelsRequestPBImpl request = + new RemoveFromClusterNodeLabelsRequestPBImpl(proto); + try { + RemoveFromClusterNodeLabelsResponse response = + real.removeFromClusterNodeLabels(request); + return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReplaceLabelsOnNodeResponseProto replaceLabelsOnNodes( + RpcController controller, ReplaceLabelsOnNodeRequestProto proto) + throws ServiceException { + ReplaceLabelsOnNodeRequestPBImpl request = + new ReplaceLabelsOnNodeRequestPBImpl(proto); + try { + ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request); + return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetNodesToLabelsResponseProto getNodeToLabels( + RpcController controller, GetNodesToLabelsRequestProto proto) + throws ServiceException { + GetNodesToLabelsRequestPBImpl request = + new GetNodesToLabelsRequestPBImpl(proto); + try { + GetNodesToLabelsResponse response = real.getNodeToLabels(request); + return ((GetNodesToLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetClusterNodeLabelsResponseProto getClusterNodeLabels( + RpcController controller, GetClusterNodeLabelsRequestProto proto) + throws ServiceException { + GetClusterNodeLabelsRequestPBImpl request = + new GetClusterNodeLabelsRequestPBImpl(proto); + try { + GetClusterNodeLabelsResponse response = + real.getClusterNodeLabels(request); + return ((GetClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java index fcdf969..65ea79f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -78,4 +78,9 @@ protected void startDispatcher() { protected void stopDispatcher() { // do nothing } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 2b7797f..ecdcb20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -57,6 +57,12 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -69,8 +75,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -618,4 +630,104 @@ public AccessControlList getAccessControlList() { public Server getServer() { return this.server; } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException { + String argName = "addToClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not add labels."); + throwStandbyException(); + } + + AddToClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception add labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception add label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { + String argName = "removeFromClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not remove labels."); + throwStandbyException(); + } + + RemoveFromClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception remove labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception remove label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + String argName = "replaceLabelsOnNode"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not set node to labels."); + throwStandbyException(); + } + + ReplaceLabelsOnNodeResponse response = + recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + try { + rmContext.getNodeLabelManager().replaceLabelsOnNode( + request.getNodeToLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception set node to labels. ", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "Exception set node to labels."); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + return GetNodesToLabelsResponsePBImpl.newInstance(rmContext + .getNodeLabelManager().getNodeLabels()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request) + throws YarnException, IOException { + return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager() + .getClusterNodeLabels()); + } }