diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java index 28e261a..5fe912d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReplaceLabelsOnNodeRequest.java @@ -44,4 +44,12 @@ public static ReplaceLabelsOnNodeRequest newInstance( @Public @Evolving public abstract Map> getNodeToLabels(); + + @Public + @Evolving + public abstract void setVerifyNodes(boolean verifyNodes); + + @Public + @Evolving + public abstract boolean getVerifyNodes(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index b9f30db..f46704e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -99,10 +99,10 @@ message RemoveFromClusterNodeLabelsResponseProto { message ReplaceLabelsOnNodeRequestProto { repeated NodeIdToLabelsNameProto nodeToLabels = 1; + optional bool verifyNodes = 2; } message ReplaceLabelsOnNodeResponseProto { - } message UpdateNodeLabelsResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 7a898a1..110706c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -130,11 +130,13 @@ new UsageInfo(" (label splitted by \",\")", "remove from cluster node labels")) .put("-replaceLabelsOnNode", - new UsageInfo( + new UsageInfo("[--fail-on-unknown-nodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1,label2\">", - "replace labels on nodes" - + " (please note that we do not support specifying multiple" - + " labels on a single host for now.)")) + "replace labels on nodes" + + " (please note that we do not support specifying multiple" + + " labels on a single host for now.)\n\t\t" + + "[--fail-on-unknown-nodes] is optional, when we set this" + + " option, it will fail if specified nodes are unknown.")) .put("-directlyAccessNodeLabelStore", new UsageInfo("", "This is DEPRECATED, will be removed in future releases. Directly access node label store, " + "with this option, all node label related operations" @@ -246,8 +248,8 @@ private static void printHelp(String cmd, boolean isHAEnabled) { " [-addToClusterNodeLabels <\"label1(exclusive=true)," + "label2(exclusive=false),label3\">]" + " [-removeFromClusterNodeLabels ]" + - " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" + - " node2[:port]=label1\">]" + + " [-replaceLabelsOnNode [--fail-on-unknown-nodes] " + + "<\"node1[:port]=label1,label2 node2[:port]=label1\">]" + " [-directlyAccessNodeLabelStore]" + " [-refreshClusterMaxPriority]" + " [-updateNodeResource [NodeID] [MemSize] [vCores]" + @@ -302,7 +304,7 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol() return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); } - + private int refreshQueues() throws IOException, YarnException { // Refresh the queue properties ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); @@ -657,14 +659,14 @@ private int removeFromClusterNodeLabels(String args) throws IOException, return map; } - private int replaceLabelsOnNodes(String args) throws IOException, - YarnException { + private int replaceLabelsOnNodes(String args, boolean verifyNodes) + throws IOException, YarnException { Map> map = buildNodeLabelsMapFromStr(args); - return replaceLabelsOnNodes(map); + return replaceLabelsOnNodes(map, verifyNodes); } - private int replaceLabelsOnNodes(Map> map) - throws IOException, YarnException { + private int replaceLabelsOnNodes(Map> map, + boolean verifyNodes) throws IOException, YarnException { if (directlyAccessNodeLabelStore) { getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map); } else { @@ -672,11 +674,12 @@ private int replaceLabelsOnNodes(Map> map) createAdminProtocol(); ReplaceLabelsOnNodeRequest request = ReplaceLabelsOnNodeRequest.newInstance(map); + request.setVerifyNodes(verifyNodes); adminProtocol.replaceLabelsOnNode(request); } return 0; } - + @Override public int run(String[] args) throws Exception { // -directlyAccessNodeLabelStore is a additional option for node label @@ -783,8 +786,16 @@ public int run(String[] args) throws Exception { System.err.println(NO_MAPPING_ERR_MSG); printUsage("", isHAEnabled); exitCode = -1; + } else if ("--fail-on-unknown-nodes".equals(args[i])) { + if (i + 1 >= args.length) { + System.err.println(NO_MAPPING_ERR_MSG); + printUsage("", isHAEnabled); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i + 1], true); + } } else { - exitCode = replaceLabelsOnNodes(args[i]); + exitCode = replaceLabelsOnNodes(args[i], false); } } else { exitCode = -1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index bea6e39..c23ad6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -469,7 +469,7 @@ public void testHelp() throws Exception { "[username]] [-addToClusterNodeLabels " + "<\"label1(exclusive=true),label2(exclusive=false),label3\">] " + "[-removeFromClusterNodeLabels ] " + - "[-replaceLabelsOnNode " + + "[-replaceLabelsOnNode [--fail-on-unknown-nodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1\">] " + "[-directlyAccessNodeLabelStore] [-refreshClusterMaxPriority] " + "[-updateNodeResource [NodeID] [MemSize] [vCores] " + @@ -564,6 +564,7 @@ public void testHelp() throws Exception { + " [username]] [-addToClusterNodeLabels <\"label1(exclusive=true)," + "label2(exclusive=false),label3\">]" + " [-removeFromClusterNodeLabels ] [-replaceLabelsOnNode " + + "[--fail-on-unknown-nodes] " + "<\"node1[:port]=label1,label2 node2[:port]=label1\">] [-directlyAccessNodeLabelStore] " + "[-refreshClusterMaxPriority] " + "[-updateNodeResource [NodeID] [MemSize] [vCores] " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java index 22e561c..6a13018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReplaceLabelsOnNodeRequestPBImpl.java @@ -146,10 +146,22 @@ public void setNodeToLabels(Map> map) { nodeIdToLabels.putAll(map); } + @Override + public boolean getVerifyNodes() { + ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getVerifyNodes(); + } + + @Override + public void setVerifyNodes(boolean verifyNodes) { + maybeInitBuilder(); + builder.setVerifyNodes(verifyNodes); + } + private NodeIdProto convertToProtoFormat(NodeId t) { return ((NodeIdPBImpl) t).getProto(); } - + @Override public int hashCode() { assert false : "hashCode not designed"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index db55264..794915f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; @@ -806,6 +809,32 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( ReplaceLabelsOnNodeResponse response = recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + + if (request.getVerifyNodes()) { + // verify if nodes have registered to RM + List unknownNodes = new ArrayList<>(); + for (NodeId requestedNode : request.getNodeToLabels().keySet()) { + boolean isKnown = false; + for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + if (isNodeSame(requestedNode, knownNode)) { + isKnown = true; + break; + } + } + if (!isKnown) { + unknownNodes.add(requestedNode); + } + } + + if (!unknownNodes.isEmpty()) { + RMAuditLogger.logFailure(user.getShortUserName(), operation, "", + "AdminService", "Replace labels on unknown nodes:" + + Arrays.toString(unknownNodes.toArray())); + throw RPCUtil.getRemoteException(new IOException( + "Replace labels on unknown nodes:" + + Arrays.toString(unknownNodes.toArray()))); + } + } try { rmContext.getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); @@ -817,6 +846,18 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( } } + private boolean isNodeSame(NodeId requestedNode, NodeId knownNode) { + if (!requestedNode.getHost().equals(knownNode.getHost())) { + return false; + } else if (requestedNode.getPort() != 0) { + // requested node port is specified + return requestedNode.getPort() == knownNode.getPort(); + } else { + // requested node port is not specified + return true; + } + } + private void checkRMStatus(String user, String operation, String msg) throws StandbyException { if (!isRMActive()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 0b65c0b..ddceb14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -64,9 +65,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1086,6 +1087,88 @@ public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled() } @Test + public void testModifyLabelsOnUnknownNodes() throws IOException, + YarnException { + // create RM and set it's ACTIVE, and set distributed node label + // configuration to true + rm = new MockRM(); + + ((RMContextImpl) rm.getRMContext()) + .setHAServiceState(HAServiceState.ACTIVE); + Map rmNodes = rm.getRMContext().getRMNodes(); + rmNodes.put(NodeId.newInstance("host1", 1111), + new RMNodeImpl(null, rm.getRMContext(), "host1", 0, 0, null, null, + null)); + rmNodes.put(NodeId.newInstance("host2", 2222), + new RMNodeImpl(null, rm.getRMContext(), "host2", 0, 0, null, null, + null)); + rmNodes.put(NodeId.newInstance("host3", 3333), + new RMNodeImpl(null, rm.getRMContext(), "host3", 0, 0, null, null, + null)); + RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); + + // by default, distributed configuration for node label is disabled, this + // should pass + labelMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", + "y")); + // replace known node + ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 1111), + (Set) ImmutableSet.of("x"))); + request1.setVerifyNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request1); + } catch (Exception ex) { + fail("should not fail on known node"); + } + + // replace known node with wildcard port + ReplaceLabelsOnNodeRequest request2 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host1", 0), + (Set) ImmutableSet.of("x"))); + request2.setVerifyNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request2); + } catch (Exception ex) { + fail("should not fail on known node"); + } + + // replace unknown node + ReplaceLabelsOnNodeRequest request3 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host4", 0), + (Set) ImmutableSet.of("x"))); + request3.setVerifyNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request3); + fail("Should fail on unknown node"); + } catch (Exception ex) { + } + + // replace known node but wrong port + ReplaceLabelsOnNodeRequest request4 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host2", 1111), + (Set) ImmutableSet.of("x"))); + request4.setVerifyNodes(true); + try { + rm.adminService.replaceLabelsOnNode(request4); + fail("Should fail on node with wrong port"); + } catch (Exception ex) { + } + + // replace non-exist node but not check + ReplaceLabelsOnNodeRequest request5 = ReplaceLabelsOnNodeRequest + .newInstance(ImmutableMap.of(NodeId.newInstance("host5", 0), + (Set) ImmutableSet.of("x"))); + request5.setVerifyNodes(false); + try { + rm.adminService.replaceLabelsOnNode(request5); + } catch (Exception ex) { + fail("Should not fail on unknown node when not verify nodes"); + } + rm.close(); + } + + @Test public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled() throws IOException, YarnException { // create RM and set it's ACTIVE