diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 4b777ea..41d4fb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -30,6 +30,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -42,6 +48,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; @@ -110,4 +120,34 @@ public RefreshServiceAclsResponse refreshServiceAcls( public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 47a6cf7..74cff17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService { rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto); rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto); rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto); + rpc addLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto); + rpc removeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto); + rpc setNodeToLabels(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto); + rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); + rpc getLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 50e5825..ffab6c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -19,11 +19,17 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; -import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -33,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; @@ -41,13 +48,21 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; + +import com.google.common.collect.ImmutableMap; @Private @Unstable @@ -55,6 +70,7 @@ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private boolean directlyAccessNodeLabelStore = false; protected final static Map ADMIN_USAGE = ImmutableMap.builder() @@ -78,7 +94,26 @@ .put("-help", new UsageInfo("[cmd]", "Displays help for the given command or all commands if none " + "is specified.")) - .build(); + .put("-addToClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "add to cluster node labels ")) + .put("-removeFromClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "remove from cluster node labels")) + .put("-replaceLabelsOnNode", + new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]", + "replace labels on nodes")) + .put("-getNodeToLabels", new UsageInfo("", + "Get node to label mappings")) + .put("-getClusterNodeLabels", + new UsageInfo("", "Get node labels in the cluster")) + .put("-directlyAccessNodeLabelStore", + new UsageInfo("", "Directly access node label store, " + + "with this option, all node label related operations" + + " will not connect RM. Instead, they will" + + " access/modify stored node labels directly." + + " By default, it is false (access via RM)")) + .build(); public RMAdminCLI() { super(); @@ -201,11 +236,13 @@ private static void printUsage(String cmd, boolean isHAEnabled) { ToolRunner.printGenericCommandUsage(System.err); } - - protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { + + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); + return ClientRMProxy.createRMProxy(conf, + ResourceManagerAdministrationProtocol.class); } private int refreshQueues() throws IOException, YarnException { @@ -285,6 +322,181 @@ private int getGroups(String[] usernames) throws IOException { return 0; } + private CommonNodeLabelsManager getLocalNodeLabelManagerInstance() + throws IOException { + CommonNodeLabelsManager localMgr = new CommonNodeLabelsManager(); + + localMgr.init(getConf()); + localMgr.start(); + + return localMgr; + } + + private int addToClusterNodeLabels(String args) throws IOException, YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + return addToClusterNodeLabels(labels); + } + + private int addToClusterNodeLabels(Set labels) throws IOException, + YarnException { + if (directlyAccessNodeLabelStore) { + CommonNodeLabelsManager mgr = getLocalNodeLabelManagerInstance(); + mgr.addToCluserNodeLabels(labels); + mgr.stop(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + AddToClusterNodeLabelsRequest request = + AddToClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.addToClusterNodeLabels(request); + } + return 0; + } + + private int removeFromClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + if (directlyAccessNodeLabelStore) { + CommonNodeLabelsManager mgr = getLocalNodeLabelManagerInstance(); + mgr.removeFromClusterNodeLabels(labels); + mgr.stop(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + RemoveFromClusterNodeLabelsRequest request = + RemoveFromClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.removeFromClusterNodeLabels(request); + } + + return 0; + } + + private int getNodeToLabels() throws IOException, YarnException { + Map> nodeToLabels = null; + + if (directlyAccessNodeLabelStore) { + CommonNodeLabelsManager mgr = getLocalNodeLabelManagerInstance(); + nodeToLabels = mgr.getNodeLabels(); + mgr.stop(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + + nodeToLabels = + adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance()) + .getNodeToLabels(); + } + for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) { + System.out.println(String.format("Host=%s, Labels=[%s]", + (host.getPort() == 0 ? host.getHost() : host.toString()), + StringUtils.join(sortStrSet(nodeToLabels.get(host)), ","))); + } + return 0; + } + + private int getClusterNodeLabels() throws IOException, YarnException { + Set labels = null; + if (directlyAccessNodeLabelStore) { + CommonNodeLabelsManager mgr = getLocalNodeLabelManagerInstance(); + labels = mgr.getClusterNodeLabels(); + mgr.stop(); + } else { + ResourceManagerAdministrationProtocol adminProto = createAdminProtocol(); + labels = + adminProto.getClusterNodeLabels( + GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); + } + + System.out.println(String.format("Labels=%s", + StringUtils.join(sortStrSet(labels).iterator(), ","))); + return 0; + } + + private List sortNodeIdSet(Set nodes) { + List list = new ArrayList(); + list.addAll(nodes); + Collections.sort(list); + return list; + } + + private List sortStrSet(Set labels) { + List list = new ArrayList(); + list.addAll(labels); + Collections.sort(list); + return list; + } + + private Map> buildNodeLabelsFromStr(String args) + throws IOException { + Map> map = new HashMap>(); + + for (String nodeToLabels : args.split("[ \n]")) { + nodeToLabels = nodeToLabels.trim(); + if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) { + continue; + } + + String[] splits = nodeToLabels.split(","); + String nodeIdStr = splits[0]; + + if (nodeIdStr.trim().isEmpty()) { + throw new IOException("node name cannot be empty"); + } + + String nodeName; + int port; + if (nodeIdStr.contains(":")) { + nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":")); + port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":"))); + } else { + nodeName = nodeIdStr; + port = 0; + } + + NodeId nodeId = NodeId.newInstance(nodeName, port); + + map.put(nodeId, new HashSet()); + + for (int i = 1; i < splits.length; i++) { + if (!splits[i].trim().isEmpty()) { + map.get(nodeId).add(splits[i].trim().toLowerCase()); + } + } + } + + return map; + } + + private int replaceLabelsOnNodes(String args) throws IOException, + YarnException { + Map> map = buildNodeLabelsFromStr(args); + return replaceLabelsOnNodes(map); + } + + private int replaceLabelsOnNodes(Map> map) + throws IOException, YarnException { + if (directlyAccessNodeLabelStore) { + CommonNodeLabelsManager mgr = getLocalNodeLabelManagerInstance(); + mgr.replaceLabelsOnNode(map); + mgr.stop(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + ReplaceLabelsOnNodeRequest request = + ReplaceLabelsOnNodeRequest.newInstance(map); + adminProtocol.replaceLabelsOnNode(request); + } + return 0; + } + @Override public int run(String[] args) throws Exception { YarnConfiguration yarnConf = @@ -351,6 +563,33 @@ public int run(String[] args) throws Exception { } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); + } else if ("-addToClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("Labels is not specified"); + exitCode = -1; + } else { + exitCode = addToClusterNodeLabels(args[i]); + } + } else if ("-removeFromClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("arg is not specified"); + exitCode = -1; + } else { + exitCode = removeFromClusterNodeLabels(args[i]); + } + } else if ("-replaceLabelsOnNode".equals(cmd)) { + if (i >= args.length) { + System.err.println("arg is not specified"); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i]); + } + } else if ("-getNodeToLabels".equals(cmd)) { + exitCode = getNodeToLabels(); + } else if ("-getClusterNodeLabels".equals(cmd)) { + exitCode = getClusterNodeLabels(); + } else if ("-directlyAccessNodeLabelStore".equals(cmd)) { + directlyAccessNodeLabelStore = true; } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index ccffaed..5a0b344 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -29,17 +29,28 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -52,8 +63,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -66,6 +87,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -205,5 +230,75 @@ public UpdateNodeResourceResponse updateNodeResource( return null; } } - + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + AddToClusterNodeLabelsRequestProto requestProto = + ((AddToClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new AddToClusterNodeLabelsResponsePBImpl(proxy.addLabels(null, + requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, + IOException { + RemoveFromClusterNodeLabelsRequestProto requestProto = + ((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new RemoveFromClusterNodeLabelsResponsePBImpl(proxy.removeLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + ReplaceLabelsOnNodeRequestProto requestProto = + ((ReplaceLabelsOnNodeRequestPBImpl) request).getProto(); + try { + return new ReplaceLabelsOnNodeResponsePBImpl(proxy.setNodeToLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + GetNodesToLabelsRequestProto requestProto = + ((GetNodesToLabelsRequestPBImpl) request).getProto(); + try { + return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request) + throws YarnException, IOException { + GetClusterNodeLabelsRequestProto requestProto = + ((GetClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new GetClusterNodeLabelsResponsePBImpl(proxy.getLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index d1f71fe..a7406e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -22,8 +22,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; @@ -36,17 +42,32 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -59,6 +80,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -204,4 +229,79 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll } } + @Override + public AddToClusterNodeLabelsResponseProto addLabels(RpcController controller, + AddToClusterNodeLabelsRequestProto proto) throws ServiceException { + AddToClusterNodeLabelsRequestPBImpl request = new AddToClusterNodeLabelsRequestPBImpl(proto); + try { + AddToClusterNodeLabelsResponse response = real.addToClusterNodeLabels(request); + return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponseProto removeLabels( + RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto) + throws ServiceException { + RemoveFromClusterNodeLabelsRequestPBImpl request = + new RemoveFromClusterNodeLabelsRequestPBImpl(proto); + try { + RemoveFromClusterNodeLabelsResponse response = real.removeFromClusterNodeLabels(request); + return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReplaceLabelsOnNodeResponseProto setNodeToLabels( + RpcController controller, ReplaceLabelsOnNodeRequestProto proto) + throws ServiceException { + ReplaceLabelsOnNodeRequestPBImpl request = + new ReplaceLabelsOnNodeRequestPBImpl(proto); + try { + ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request); + return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetNodesToLabelsResponseProto getNodeToLabels(RpcController controller, + GetNodesToLabelsRequestProto proto) throws ServiceException { + GetNodesToLabelsRequestPBImpl request = + new GetNodesToLabelsRequestPBImpl(proto); + try { + GetNodesToLabelsResponse response = real.getNodeToLabels(request); + return ((GetNodesToLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetClusterNodeLabelsResponseProto getLabels(RpcController controller, + GetClusterNodeLabelsRequestProto proto) throws ServiceException { + GetClusterNodeLabelsRequestPBImpl request = + new GetClusterNodeLabelsRequestPBImpl(proto); + try { + GetClusterNodeLabelsResponse response = real.getClusterNodeLabels(request); + return ((GetClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 2b7797f..4af5372 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -57,6 +57,12 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -69,8 +75,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -618,4 +630,104 @@ public AccessControlList getAccessControlList() { public Server getServer() { return this.server; } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException { + String argName = "addLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not add labels."); + throwStandbyException(); + } + + AddToClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception add labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception add label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { + String argName = "removeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not remove labels."); + throwStandbyException(); + } + + RemoveFromClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception remove labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception remove label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + String argName = "setNodeToLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not set node to labels."); + throwStandbyException(); + } + + ReplaceLabelsOnNodeResponse response = + recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + try { + rmContext.getNodeLabelManager().replaceLabelsOnNode( + request.getNodeToLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception set node to labels. ", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "Exception set node to labels."); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + return GetNodesToLabelsResponsePBImpl.newInstance(rmContext + .getNodeLabelManager().getNodeLabels()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request) + throws YarnException, IOException { + return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager() + .getClusterNodeLabels()); + } }