diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index f621aa209a5..67ad5bac294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -64,6 +64,26 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress, return remoteNode; } + /** + * Create new Instance. + * @param nodeId NodeId. + * @param httpAddress Http address. + * @param rackName Rack Name. + * @param nodePartition Node Partition. + * @return RemoteNode Instance. + */ + @Private + @Unstable + public static RemoteNode newInstance(NodeId nodeId, String httpAddress, + String rackName, String nodePartition) { + RemoteNode remoteNode = Records.newRecord(RemoteNode.class); + remoteNode.setNodeId(nodeId); + remoteNode.setHttpAddress(httpAddress); + remoteNode.setRackName(rackName); + remoteNode.setNodePartition(nodePartition); + return remoteNode; + } + /** * Get {@link NodeId}. * @return NodeId. @@ -117,6 +137,23 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress, * @param other RemoteNode. * @return Comparison. */ + + /** + * Get Node Partition. + * @return Node Partition. + */ + @Private + @Unstable + public abstract String getNodePartition(); + + /** + * Set Node Partition. + * @param nodePartition + */ + @Private + @Unstable + public abstract void setNodePartition(String nodePartition); + @Override public int compareTo(RemoteNode other) { return this.getNodeId().compareTo(other.getNodeId()); @@ -127,6 +164,7 @@ public String toString() { return "RemoteNode{" + "nodeId=" + getNodeId() + ", " + "rackName=" + getRackName() + ", " + - "httpAddress=" + getHttpAddress() + "}"; + "httpAddress=" + getHttpAddress() + ", " + + "partition=" + getNodePartition() + "}"; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java index c2492cf4663..4d57e7a2152 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java @@ -136,6 +136,25 @@ public void setRackName(String rackName) { builder.setRackName(rackName); } + @Override + public String getNodePartition() { + RemoteNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodePartition()) { + return ""; + } + return (p.getNodePartition()); + } + + @Override + public void setNodePartition(String nodePartition) { + maybeInitBuilder(); + if (nodePartition == null) { + builder.clearNodePartition(); + return; + } + builder.setNodePartition(nodePartition); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1f53648d55d..f31f3eb23c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -461,10 +462,17 @@ private void allocateContainersInternal(long rmIdentifier, private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, EnrichedResourceRequest enrichedRR) { + LinkedList retList = new LinkedList<>(); + String partition = getRequestPartition(enrichedRR); if (loopIndex > 1) { - return allNodes.values(); + for (RemoteNode remoteNode : allNodes.values()) { + if (StringUtils.equals(partition, remoteNode.getNodePartition())) { + retList.add(remoteNode); + } + } + return retList; } else { - LinkedList retList = new LinkedList<>(); + int numContainers = enrichedRR.getRequest().getNumContainers(); while (numContainers > 0) { if (loopIndex == 0) { @@ -489,8 +497,10 @@ private void allocateContainersInternal(long rmIdentifier, private int collectRackLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, LinkedList retList, Set blackList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { - if (enrichedRR.getRackLocations().contains(rNode.getRackName())) { + if (StringUtils.equals(partition, rNode.getNodePartition()) && + enrichedRR.getRackLocations().contains(rNode.getRackName())) { if (blackList.contains(rNode.getNodeId().getHost())) { retList.addLast(rNode); } else { @@ -508,9 +518,11 @@ private int collectRackLocalCandidates(Map allNodes, private int collectNodeLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, List retList, int numContainers) { + String partition = getRequestPartition(enrichedRR); for (String nodeName : enrichedRR.getNodeLocations()) { RemoteNode remoteNode = allNodes.get(nodeName); - if (remoteNode != null) { + if (remoteNode != null && + StringUtils.equals(partition, remoteNode.getNodePartition())) { retList.add(remoteNode); numContainers--; } @@ -563,7 +575,7 @@ private Container createContainer(long rmIdentifier, long tokenExpiry, capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, schedulerKey.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + null, node.getNodePartition(), ContainerType.TASK, ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId()); byte[] pwd = tokenSecretManager.createPassword(containerTokenIdentifier); @@ -616,4 +628,12 @@ public PartitionedResourceRequests partitionAskList( } return partitionedRequests; } + + private String getRequestPartition(EnrichedResourceRequest enrichedRR) { + String partition = enrichedRR.getRequest().getNodeLabelExpression(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 1b090bf232b..5153cfb33df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RemoteNodeProto { optional NodeIdProto node_id = 1; optional string http_address = 2; optional string rack_name = 3; + optional string nodePartition = 4; } message RegisterDistributedSchedulingAMResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 788b0b386a8..b6f49ac58e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -596,4 +596,38 @@ public void testLotsOfContainersRackLocalAllocation() } Assert.assertEquals(100, containers.size()); } + + @Test + public void testAllocationWithNodeLabels() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, "label", + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"))); + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(0, containers.size()); + Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1", + "label"))); + + containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(1, containers.size()); + Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index ce425dfaac4..724556e8bec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -174,6 +175,16 @@ public void allocate(ApplicationAttemptId appAttemptId, appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + if (!partitionedAsks.getOpportunistic().isEmpty()) { + String appPartition = appAttempt.getAppAMNodePartitionName(); + + for (ResourceRequest req : partitionedAsks.getOpportunistic()) { + if (null == req.getNodeLabelExpression()) { + req.setNodeLabelExpression(appPartition); + } + } + } + List oppContainers = oppContainerAllocator.allocateContainers( request.getResourceBlacklistRequest(), @@ -378,7 +389,6 @@ public void handle(SchedulerEvent event) { nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), nodeResourceUpdatedEvent.getResourceOption()); break; - // <-- IGNORED EVENTS : START --> case APP_ADDED: break; @@ -436,6 +446,7 @@ private RemoteNode convertToRemoteNode(NodeId nodeId) { if (node != null) { RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress()); rNode.setRackName(node.getRackName()); + rNode.setNodePartition(node.getPartition()); return rNode; } return null;