diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index f621aa209a5..ffbdb396f09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -64,6 +64,26 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress, return remoteNode; } + /** + * Create new Instance + * @param nodeId NodeId. + * @param httpAddress Http address. + * @param rackName Rack Name. + * @param nodePartition Node Partition. + * @return RemoteNode Instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress, + String rackName, String nodePartition) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + remoteNode.setRackName(rackName); + remoteNode.setNodePartition(nodePartition); + return remoteNode; + } + /** * Get {@link NodeId}. * @return NodeId. @@ -117,6 +137,23 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress, * @param other RemoteNode. * @return Comparison. */ + + /** + * Get Node Partition. + * @return Node Partition. + */ + @Private + @Unstable + public abstract String getNodePartition(); + + /** + * Set Node Partition. + * @param nodePartition + */ + @Private + @Unstable + public abstract void setNodePartition(String nodePartition); + @Override public int compareTo(RemoteNode other) { return this.getNodeId().compareTo(other.getNodeId()); @@ -127,6 +164,7 @@ public String toString() { return "RemoteNode{" + "nodeId=" + getNodeId() + ", " + "rackName=" + getRackName() + ", " + - "httpAddress=" + getHttpAddress() + "}"; + "httpAddress=" + getHttpAddress() + ", " + + "partition=" + getNodePartition() + "}"; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java index c2492cf4663..4d57e7a2152 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -136,6 +136,25 @@ public void setRackName(String rackName) { builder.setRackName(rackName); } + @Override + public String getNodePartition() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodePartition()) { + return ""; + } + return (p.getNodePartition()); + } + + @Override + public void setNodePartition(String nodePartition) { + maybeInitBuilder(); + if (nodePartition == null) { + builder.clearNodePartition(); + return; + } + builder.setNodePartition(nodePartition); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1f53648d55d..9a1bd918258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -461,10 +462,17 @@ private void allocateContainersInternal(long rmIdentifier, private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, EnrichedResourceRequest enrichedRR) { + LinkedList retList = new LinkedList<>(); + String partition = getRequestPartition(enrichedRR); if (loopIndex > 1) { - return allNodes.values(); + for (RemoteNode remoteNode : allNodes.values()) { + if (StringUtils.equals(partition,remoteNode.getNodePartition())) { + retList.add(remoteNode); + } + } + return retList; } else { - LinkedList retList = new LinkedList<>(); + int numContainers = enrichedRR.getRequest().getNumContainers(); while (numContainers > 0) { if (loopIndex == 0) { @@ -489,8 +497,10 @@ private void allocateContainersInternal(long rmIdentifier, private int collectRackLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, LinkedList retList, Set blackList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { - if (enrichedRR.getRackLocations().contains(rNode.getRackName())) { + if (StringUtils.equals(partition, rNode.getNodePartition()) && + enrichedRR.getRackLocations().contains(rNode.getRackName())) { if (blackList.contains(rNode.getNodeId().getHost())) { retList.addLast(rNode); } else { @@ -508,9 +518,11 @@ private int collectRackLocalCandidates(Map allNodes, private int collectNodeLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, List retList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (String nodeName : enrichedRR.getNodeLocations()) { RemoteNode remoteNode = allNodes.get(nodeName); - if (remoteNode != null) { + if (remoteNode != null && + StringUtils.equals(partition, remoteNode.getNodePartition())) { retList.add(remoteNode); numContainers--; } @@ -563,7 +575,7 @@ private Container createContainer(long rmIdentifier, long tokenExpiry, capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, schedulerKey.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + null, node.getNodePartition(), ContainerType.TASK, ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId()); byte[] pwd = tokenSecretManager.createPassword(containerTokenIdentifier); @@ -616,4 +628,12 @@ public PartitionedResourceRequests partitionAskList( } return partitionedRequests; } + + private String getRequestPartition(EnrichedResourceRequest enrichedRR) { + String partition = enrichedRR.getRequest().getNodeLabelExpression(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 1b090bf232b..5153cfb33df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RemoteNodeProto { optional NodeIdProto node_id = 1; optional string http_address = 2; optional string rack_name = 3; + optional string nodePartition = 4; } message RegisterDistributedSchedulingAMResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 788b0b386a8..b6f49ac58e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -596,4 +596,38 @@ public void testLotsOfContainersRackLocalAllocation() } Assert.assertEquals(100, containers.size()); } + + @Test + public void testAllocationWithNodeLabels() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, "label", + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"))); + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(0, containers.size()); + Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1", + "label"))); + + containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(1, containers.size()); + Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java index 5fb05ca0f6d..8f0cc480584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Implementations of this class are notified of changes to the cluster's state, @@ -37,4 +40,6 @@ void updateNode(RMNode rmNode); void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); + + void updateNodeLabels(Map> updatedNodeToLabels); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index ce425dfaac4..756c680abfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -174,6 +176,16 @@ public void allocate(ApplicationAttemptId appAttemptId, appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + if (!partitionedAsks.getOpportunistic().isEmpty()) { + String appPartition = appAttempt.getAppAMNodePartitionName(); + + for (ResourceRequest req : partitionedAsks.getOpportunistic()) { + if (null == req.getNodeLabelExpression()) { + req.setNodeLabelExpression(appPartition); + } + } + } + List oppContainers = oppContainerAllocator.allocateContainers( request.getResourceBlacklistRequest(), @@ -378,7 +390,15 @@ public void handle(SchedulerEvent event) { nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); break; - + case NODE_LABELS_UPDATE: + if (!(event instanceof NodeLabelsUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeLabelsUpdateSchedulerEvent nodeLabelsUpdatedEvent = + (NodeLabelsUpdateSchedulerEvent) event; + nodeMonitor.updateNodeLabels( + nodeLabelsUpdatedEvent.getUpdatedNodeToLabels()); + break; // <-- IGNORED EVENTS : START --> case APP_ADDED: break; @@ -390,8 +410,6 @@ public void handle(SchedulerEvent event) { break; case CONTAINER_EXPIRED: break; - case NODE_LABELS_UPDATE: - break; case RELEASE_CONTAINER: break; // <-- IGNORED EVENTS : END --> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index e9890999454..d4cdbed3c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -23,9 +23,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import java.util.ArrayList; @@ -33,6 +35,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -76,6 +79,7 @@ public int getMetric(ClusterNode c) { double timestamp; final NodeId nodeId; private int queueCapacity = 0; + private String partition = RMNodeLabelsManager.NO_LABEL; public ClusterNode(NodeId nodeId) { this.nodeId = nodeId; @@ -102,6 +106,11 @@ public ClusterNode setQueueCapacity(int capacity) { return this; } + public ClusterNode setPartition(String partition) { + this.partition = partition; + return this; + } + public boolean isQueueFull() { return this.queueCapacity > 0 && this.queueLength >= this.queueCapacity; @@ -236,7 +245,8 @@ public void updateNode(RMNode rmNode) { new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) - .setQueueCapacity(opportQueueCapacity)); + .setQueueCapacity(opportQueueCapacity)) + .setPartition(getPartition(rmNode)); LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); @@ -251,7 +261,8 @@ public void updateNode(RMNode rmNode) { currentNode .setQueueWaitTime(estimatedQueueWaitTime) .setQueueLength(waitQueueLength) - .updateTimestamp(); + .updateTimestamp() + .setPartition(getPartition(rmNode)); if (LOG.isDebugEnabled()) { LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + @@ -269,6 +280,22 @@ public void updateNode(RMNode rmNode) { } } + @Override + public void updateNodeLabels(Map> updatedNodeToLabels) { + for(Map.Entry> entry : updatedNodeToLabels.entrySet()) { + NodeId nodeId = entry.getKey(); + ClusterNode node = this.clusterNodes.get(nodeId); + if (node != null) { + Set labels = entry.getValue(); + if (labels == null || labels.isEmpty()) { + node.setPartition(CommonNodeLabelsManager.NO_LABEL); + } else { + node.setPartition(labels.iterator().next()); + } + } + } + } + @Override public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { LOG.debug("Node resource update event from: " + rmNode.getNodeID()); @@ -327,4 +354,18 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { } } + /** + * Get partition of which the node belongs to, if node-labels of this node is + * empty or null, it belongs to NO_LABEL partition. And since we only support + * one partition for each node (YARN-2694), first label will be its partition. + * @return Partition for the node. + */ + public String getPartition(RMNode node) { + Set labels = node.getNodeLabels(); + if (labels == null || labels.isEmpty()) { + return RMNodeLabelsManager.NO_LABEL; + } else { + return labels.iterator().next(); + } + } }