diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java new file mode 100644 index 00000000000..79eac73abd2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/DistributedOpportunisticContainerAllocator.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + *

+ * The DistributedOpportunisticContainerAllocator allocates containers on a + * given list of nodes, after modifying the container sizes to respect the + * limits set by the ResourceManager. It tries to distribute the containers + * as evenly as possible. + *

+ */ +public class DistributedOpportunisticContainerAllocator + extends OpportunisticContainerAllocator { + + private static final int NODE_LOCAL_LOOP = 0; + private static final int RACK_LOCAL_LOOP = 1; + private static final int OFF_SWITCH_LOOP = 2; + + private static final Logger LOG = + LoggerFactory.getLogger(DistributedOpportunisticContainerAllocator.class); + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + */ + public DistributedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager) { + super(tokenSecretManager); + } + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public DistributedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat) { + super(tokenSecretManager, maxAllocationsPerAMHeartbeat); + } + + @Override + public List allocateContainers(ResourceBlacklistRequest blackList, + List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext opportContext, long rmIdentifier, + String appSubmitter) throws YarnException { + + // Update black list. + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(oppResourceReqs); + Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + Set allocatedNodes = new HashSet<>(); + List allocatedContainers = new ArrayList<>(); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } + Map> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList, allocatedNodes, remAllocs); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + for (Map> allocation : allocations) { + for (Map.Entry> e : allocation.entrySet()) { + opportContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); + } + } + } + } + + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, + ApplicationAttemptId appAttId, String userName, Set blackList, + Set allocatedNodes, int maxAllocations) + throws YarnException { + Map> containers = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : + appContext.getOutstandingOpReqs().get(schedKey).values()) { + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List allocs : containers.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); + break; + } + } + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), blackList, allocatedNodes, + appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, + remainingAllocs); + ResourceRequest anyAsk = enrichedAsk.getRequest(); + if (!containers.isEmpty()) { + LOG.info("Opportunistic allocation requested for [priority={}, " + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), + anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), + anyAsk.getCapability(), containers.keySet()); + } + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, Set allocatedNodes, + ApplicationAttemptId id, Map allNodes, + String userName, Map> allocations, + EnrichedResourceRequest enrichedAsk, int maxAllocations) + throws YarnException { + if (allNodes.size() == 0) { + LOG.info("No nodes currently available to " + + "allocate OPPORTUNISTIC containers."); + return; + } + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int toAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } + int numAllocated = 0; + // Node Candidates are selected as follows: + // * Node local candidates selected in loop == 0 + // * Rack local candidates selected in loop == 1 + // * From loop == 2 onwards, we revert to off switch allocations. + int loopIndex = OFF_SWITCH_LOOP; + if (enrichedAsk.getNodeLocations().size() > 0) { + loopIndex = NODE_LOCAL_LOOP; + } + while (numAllocated < toAllocate) { + Collection nodeCandidates = + findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, + enrichedAsk); + for (RemoteNode rNode : nodeCandidates) { + String rNodeHost = rNode.getNodeId().getHost(); + // Ignore black list + if (blacklist.contains(rNodeHost)) { + LOG.info("Nodes for scheduling has a blacklisted node" + + " [" + rNodeHost + "].."); + continue; + } + String location = ResourceRequest.ANY; + if (loopIndex == NODE_LOCAL_LOOP) { + if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { + location = rNodeHost; + } else { + continue; + } + } else if (allocatedNodes.contains(rNodeHost)) { + LOG.info("Opportunistic container has already been allocated on {}.", + rNodeHost); + continue; + } + if (loopIndex == RACK_LOCAL_LOOP) { + if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { + location = rNode.getRackName(); + } else { + continue; + } + } + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, location, + anyAsk, rNode, getRemoteNodePartition(rNode)); + numAllocated++; + updateMetrics(loopIndex); + allocatedNodes.add(rNodeHost); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + location + "]"); + if (numAllocated >= toAllocate) { + break; + } + } + if (loopIndex == NODE_LOCAL_LOOP && + enrichedAsk.getRackLocations().size() > 0) { + loopIndex = RACK_LOCAL_LOOP; + } else { + loopIndex++; + } + // Handle case where there are no nodes remaining after blacklist is + // considered. + if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { + LOG.warn("Unable to allocate any opportunistic containers."); + break; + } + } + } + + + + private void updateMetrics(int loopIndex) { + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + if (loopIndex == NODE_LOCAL_LOOP) { + metrics.incrNodeLocalOppContainers(); + } else if (loopIndex == RACK_LOCAL_LOOP) { + metrics.incrRackLocalOppContainers(); + } else { + metrics.incrOffSwitchOppContainers(); + } + } + + private Collection findNodeCandidates(int loopIndex, + Map allNodes, Set blackList, + Set allocatedNodes, EnrichedResourceRequest enrichedRR) { + LinkedList retList = new LinkedList<>(); + String partition = getRequestPartition(enrichedRR); + if (loopIndex > 1) { + for (RemoteNode remoteNode : allNodes.values()) { + if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { + retList.add(remoteNode); + } + } + return retList; + } else { + + int numContainers = enrichedRR.getRequest().getNumContainers(); + while (numContainers > 0) { + if (loopIndex == 0) { + // Node local candidates + numContainers = collectNodeLocalCandidates( + allNodes, enrichedRR, retList, numContainers); + } else { + // Rack local candidates + numContainers = + collectRackLocalCandidates(allNodes, enrichedRR, retList, + blackList, allocatedNodes, numContainers); + } + if (numContainers == enrichedRR.getRequest().getNumContainers()) { + // If there is no change in numContainers, then there is no point + // in looping again. + break; + } + } + return retList; + } + } + + private int collectRackLocalCandidates(Map allNodes, + EnrichedResourceRequest enrichedRR, LinkedList retList, + Set blackList, Set allocatedNodes, int numContainers) { + String partition = getRequestPartition(enrichedRR); + for (RemoteNode rNode : allNodes.values()) { + if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && + enrichedRR.getRackLocations().contains(rNode.getRackName())) { + String rHost = rNode.getNodeId().getHost(); + if (blackList.contains(rHost)) { + continue; + } + if (allocatedNodes.contains(rHost)) { + retList.addLast(rNode); + } else { + retList.addFirst(rNode); + numContainers--; + } + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private int collectNodeLocalCandidates(Map allNodes, + EnrichedResourceRequest enrichedRR, List retList, + int numContainers) { + String partition = getRequestPartition(enrichedRR); + for (String nodeName : enrichedRR.getNodeLocations()) { + RemoteNode remoteNode = allNodes.get(nodeName); + if (remoteNode != null && + StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { + retList.add(remoteNode); + numContainers--; + } + if (numContainers == 0) { + break; + } + } + return numContainers; + } + + private String getRequestPartition(EnrichedResourceRequest enrichedRR) { + String partition = enrichedRR.getRequest().getNodeLabelExpression(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } + + private String getRemoteNodePartition(RemoteNode node) { + String partition = node.getNodePartition(); + if (partition == null) { + partition = CommonNodeLabelsManager.NO_LABEL; + } + return partition; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 0ce19763803..a7d3f4a3b76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -16,10 +16,10 @@ * limitations under the License. */ + package org.apache.hadoop.yarn.server.scheduler; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.Time; @@ -33,46 +33,25 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ContainerType; - import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; -import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -/** - *

- * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. - *

- */ -public class OpportunisticContainerAllocator { - - private static final int NODE_LOCAL_LOOP = 0; - private static final int RACK_LOCAL_LOOP = 1; - private static final int OFF_SWITCH_LOOP = 2; - - private int maxAllocationsPerAMHeartbeat = -1; +public abstract class OpportunisticContainerAllocator { /** * This class encapsulates application specific parameters used to build a @@ -212,15 +191,7 @@ public long generateContainerId() { } } - private static final Logger LOG = - LoggerFactory.getLogger(OpportunisticContainerAllocator.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DominantResourceCalculator(); - - private final BaseContainerTokenSecretManager tokenSecretManager; - - static class Allocation { + public static class Allocation { private final Container container; private final String resourceName; @@ -229,7 +200,7 @@ public long generateContainerId() { this.resourceName = resourceName; } - Container getContainer() { + public Container getContainer() { return container; } @@ -238,7 +209,7 @@ String getResourceName() { } } - static class EnrichedResourceRequest { + public static class EnrichedResourceRequest { private final Map nodeLocations = new HashMap<>(); private final Map rackLocations = new HashMap<>(); private final ResourceRequest request; @@ -253,7 +224,7 @@ long getTimestamp() { return timestamp; } - ResourceRequest getRequest() { + public ResourceRequest getRequest() { return request; } @@ -284,29 +255,28 @@ void removeLocation(String location) { } } - Set getNodeLocations() { + public Set getNodeLocations() { return nodeLocations.keySet(); } - Set getRackLocations() { + public Set getRackLocations() { return rackLocations.keySet(); } } - /** - * Create a new Opportunistic Container Allocator. - * @param tokenSecretManager TokenSecretManager - */ + + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + private final BaseContainerTokenSecretManager tokenSecretManager; + + protected int maxAllocationsPerAMHeartbeat = -1; + public OpportunisticContainerAllocator( BaseContainerTokenSecretManager tokenSecretManager) { this.tokenSecretManager = tokenSecretManager; } - /** - * Create a new Opportunistic Container Allocator. - * @param tokenSecretManager TokenSecretManager - * @param maxAllocationsPerAMHeartbeat max number of containers to be - * allocated in one AM heartbeat - */ public OpportunisticContainerAllocator( BaseContainerTokenSecretManager tokenSecretManager, int maxAllocationsPerAMHeartbeat) { @@ -314,9 +284,25 @@ public OpportunisticContainerAllocator( this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; } - @VisibleForTesting - void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { - this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; + /** + * Partitions a list of ResourceRequest to two separate lists, one for + * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. + * @param askList the list of ResourceRequests to be partitioned + * @return the partitioned ResourceRequests + */ + public PartitionedResourceRequests partitionAskList( + List askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; } /** @@ -330,72 +316,20 @@ void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { * @return List of Containers. * @throws YarnException YarnException */ - public List allocateContainers(ResourceBlacklistRequest blackList, + public abstract List allocateContainers( + ResourceBlacklistRequest blackList, List oppResourceReqs, ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, - String appSubmitter) throws YarnException { - - // Update black list. - if (blackList != null) { - opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); - } + String appSubmitter) throws YarnException; - // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(oppResourceReqs); - Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); - Set allocatedNodes = new HashSet<>(); - List allocatedContainers = new ArrayList<>(); - - // Satisfy the outstanding OPPORTUNISTIC requests. - boolean continueLoop = true; - while (continueLoop) { - continueLoop = false; - List>> allocations = new ArrayList<>(); - for (SchedulerRequestKey schedulerKey : - opportContext.getOutstandingOpReqs().descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given cap (the actual container size - // might be different than what is requested, which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - int remAllocs = -1; - if (maxAllocationsPerAMHeartbeat > 0) { - remAllocs = - maxAllocationsPerAMHeartbeat - allocatedContainers.size() - - getTotalAllocations(allocations); - if (remAllocs <= 0) { - LOG.info("Not allocating more containers as we have reached max " - + "allocations per AM heartbeat {}", - maxAllocationsPerAMHeartbeat); - break; - } - } - Map> allocation = allocate( - rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList, allocatedNodes, remAllocs); - if (allocation.size() > 0) { - allocations.add(allocation); - continueLoop = true; - } - } - for (Map> allocation : allocations) { - for (Map.Entry> e : allocation.entrySet()) { - opportContext.matchAllocationToOutstandingRequest( - e.getKey(), e.getValue()); - for (Allocation alloc : e.getValue()) { - allocatedContainers.add(alloc.getContainer()); - } - } - } - } - return allocatedContainers; + @VisibleForTesting + void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { + this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; } - private int getTotalAllocations( + protected int getTotalAllocations( List>> allocations) { int totalAllocs = 0; for (Map> allocation : allocations) { @@ -406,229 +340,13 @@ private int getTotalAllocations( return totalAllocs; } - private Map> allocate(long rmIdentifier, - OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName, Set blackList, - Set allocatedNodes, int maxAllocations) - throws YarnException { - Map> containers = new HashMap<>(); - for (EnrichedResourceRequest enrichedAsk : - appContext.getOutstandingOpReqs().get(schedKey).values()) { - int remainingAllocs = -1; - if (maxAllocations > 0) { - int totalAllocated = 0; - for (List allocs : containers.values()) { - totalAllocated += allocs.size(); - } - remainingAllocs = maxAllocations - totalAllocated; - if (remainingAllocs <= 0) { - LOG.info("Not allocating more containers as max allocations per AM " - + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); - break; - } - } - allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), blackList, allocatedNodes, - appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, - remainingAllocs); - ResourceRequest anyAsk = enrichedAsk.getRequest(); - if (!containers.isEmpty()) { - LOG.info("Opportunistic allocation requested for [priority={}, " - + "allocationRequestId={}, num_containers={}, capability={}] " - + "allocated = {}", anyAsk.getPriority(), - anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), - anyAsk.getCapability(), containers.keySet()); - } - } - return containers; - } - - private void allocateContainersInternal(long rmIdentifier, - AllocationParams appParams, ContainerIdGenerator idCounter, - Set blacklist, Set allocatedNodes, - ApplicationAttemptId id, Map allNodes, - String userName, Map> allocations, - EnrichedResourceRequest enrichedAsk, int maxAllocations) - throws YarnException { - if (allNodes.size() == 0) { - LOG.info("No nodes currently available to " + - "allocate OPPORTUNISTIC containers."); - return; - } - ResourceRequest anyAsk = enrichedAsk.getRequest(); - int toAllocate = anyAsk.getNumContainers() - - (allocations.isEmpty() ? 0 : - allocations.get(anyAsk.getCapability()).size()); - toAllocate = Math.min(toAllocate, - appParams.getMaxAllocationsPerSchedulerKeyPerRound()); - if (maxAllocations >= 0) { - toAllocate = Math.min(maxAllocations, toAllocate); - } - int numAllocated = 0; - // Node Candidates are selected as follows: - // * Node local candidates selected in loop == 0 - // * Rack local candidates selected in loop == 1 - // * From loop == 2 onwards, we revert to off switch allocations. - int loopIndex = OFF_SWITCH_LOOP; - if (enrichedAsk.getNodeLocations().size() > 0) { - loopIndex = NODE_LOCAL_LOOP; - } - while (numAllocated < toAllocate) { - Collection nodeCandidates = - findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, - enrichedAsk); - for (RemoteNode rNode : nodeCandidates) { - String rNodeHost = rNode.getNodeId().getHost(); - // Ignore black list - if (blacklist.contains(rNodeHost)) { - LOG.info("Nodes for scheduling has a blacklisted node" + - " [" + rNodeHost + "].."); - continue; - } - String location = ResourceRequest.ANY; - if (loopIndex == NODE_LOCAL_LOOP) { - if (enrichedAsk.getNodeLocations().contains(rNodeHost)) { - location = rNodeHost; - } else { - continue; - } - } else if (allocatedNodes.contains(rNodeHost)) { - LOG.info("Opportunistic container has already been allocated on {}.", - rNodeHost); - continue; - } - if (loopIndex == RACK_LOCAL_LOOP) { - if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { - location = rNode.getRackName(); - } else { - continue; - } - } - Container container = createContainer(rmIdentifier, appParams, - idCounter, id, userName, allocations, location, - anyAsk, rNode); - numAllocated++; - updateMetrics(loopIndex); - allocatedNodes.add(rNodeHost); - LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + - "location [" + location + "]"); - if (numAllocated >= toAllocate) { - break; - } - } - if (loopIndex == NODE_LOCAL_LOOP && - enrichedAsk.getRackLocations().size() > 0) { - loopIndex = RACK_LOCAL_LOOP; - } else { - loopIndex++; - } - // Handle case where there are no nodes remaining after blacklist is - // considered. - if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) { - LOG.warn("Unable to allocate any opportunistic containers."); - break; - } - } - } - - private void updateMetrics(int loopIndex) { - OpportunisticSchedulerMetrics metrics = - OpportunisticSchedulerMetrics.getMetrics(); - if (loopIndex == NODE_LOCAL_LOOP) { - metrics.incrNodeLocalOppContainers(); - } else if (loopIndex == RACK_LOCAL_LOOP) { - metrics.incrRackLocalOppContainers(); - } else { - metrics.incrOffSwitchOppContainers(); - } - } - - private Collection findNodeCandidates(int loopIndex, - Map allNodes, Set blackList, - Set allocatedNodes, EnrichedResourceRequest enrichedRR) { - LinkedList retList = new LinkedList<>(); - String partition = getRequestPartition(enrichedRR); - if (loopIndex > 1) { - for (RemoteNode remoteNode : allNodes.values()) { - if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { - retList.add(remoteNode); - } - } - return retList; - } else { - - int numContainers = enrichedRR.getRequest().getNumContainers(); - while (numContainers > 0) { - if (loopIndex == 0) { - // Node local candidates - numContainers = collectNodeLocalCandidates( - allNodes, enrichedRR, retList, numContainers); - } else { - // Rack local candidates - numContainers = - collectRackLocalCandidates(allNodes, enrichedRR, retList, - blackList, allocatedNodes, numContainers); - } - if (numContainers == enrichedRR.getRequest().getNumContainers()) { - // If there is no change in numContainers, then there is no point - // in looping again. - break; - } - } - return retList; - } - } - - private int collectRackLocalCandidates(Map allNodes, - EnrichedResourceRequest enrichedRR, LinkedList retList, - Set blackList, Set allocatedNodes, int numContainers) { - String partition = getRequestPartition(enrichedRR); - for (RemoteNode rNode : allNodes.values()) { - if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && - enrichedRR.getRackLocations().contains(rNode.getRackName())) { - String rHost = rNode.getNodeId().getHost(); - if (blackList.contains(rHost)) { - continue; - } - if (allocatedNodes.contains(rHost)) { - retList.addLast(rNode); - } else { - retList.addFirst(rNode); - numContainers--; - } - } - if (numContainers == 0) { - break; - } - } - return numContainers; - } - - private int collectNodeLocalCandidates(Map allNodes, - EnrichedResourceRequest enrichedRR, List retList, - int numContainers) { - String partition = getRequestPartition(enrichedRR); - for (String nodeName : enrichedRR.getNodeLocations()) { - RemoteNode remoteNode = allNodes.get(nodeName); - if (remoteNode != null && - StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) { - retList.add(remoteNode); - numContainers--; - } - if (numContainers == 0) { - break; - } - } - return numContainers; - } - - private Container createContainer(long rmIdentifier, + protected Container createContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ApplicationAttemptId id, String userName, Map> allocations, String location, - ResourceRequest anyAsk, RemoteNode rNode) throws YarnException { + ResourceRequest anyAsk, RemoteNode rNode, String nodeLabel) throws YarnException { Container container = buildContainer(rmIdentifier, appParams, - idCounter, anyAsk, id, userName, rNode); + idCounter, anyAsk, id, userName, rNode, nodeLabel); List allocList = allocations.get(anyAsk.getCapability()); if (allocList == null) { allocList = new ArrayList<>(); @@ -641,7 +359,7 @@ private Container createContainer(long rmIdentifier, private Container buildContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ResourceRequest rr, ApplicationAttemptId id, String userName, - RemoteNode node) throws YarnException { + RemoteNode node, String nodeLabel) throws YarnException { ContainerId cId = ContainerId.newContainerId(id, idCounter.generateContainerId()); @@ -651,12 +369,13 @@ private Container buildContainer(long rmIdentifier, return createContainer( rmIdentifier, appParams.getContainerTokenExpiryInterval(), - SchedulerRequestKey.create(rr), userName, node, cId, capability); + SchedulerRequestKey.create(rr), userName, node, cId, capability, + nodeLabel); } private Container createContainer(long rmIdentifier, long tokenExpiry, SchedulerRequestKey schedulerKey, String userName, RemoteNode node, - ContainerId cId, Resource capability) { + ContainerId cId, Resource capability, String nodeLabel) { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( @@ -664,7 +383,7 @@ private Container createContainer(long rmIdentifier, long tokenExpiry, capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, schedulerKey.getPriority(), currTime, - null, getRemoteNodePartition(node), ContainerType.TASK, + null, nodeLabel, ContainerType.TASK, ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId()); byte[] pwd = tokenSecretManager.createPassword(containerTokenIdentifier); @@ -681,8 +400,8 @@ private Container createContainer(long rmIdentifier, long tokenExpiry, private Resource normalizeCapability(AllocationParams appParams, ResourceRequest ask) { return Resources.normalize(RESOURCE_CALCULATOR, - ask.getCapability(), appParams.minResource, appParams.maxResource, - appParams.incrementResource); + ask.getCapability(), appParams.getMinResource(), + appParams.getMaxResource(), appParams.getIncrementResource()); } private static Token newContainerToken(NodeId nodeId, byte[] password, @@ -696,41 +415,4 @@ private static Token newContainerToken(NodeId nodeId, byte[] password, .buildTokenService(addr).toString()); return containerToken; } - - /** - * Partitions a list of ResourceRequest to two separate lists, one for - * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. - * @param askList the list of ResourceRequests to be partitioned - * @return the partitioned ResourceRequests - */ - public PartitionedResourceRequests partitionAskList( - List askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - - private String getRequestPartition(EnrichedResourceRequest enrichedRR) { - String partition = enrichedRR.getRequest().getNodeLabelExpression(); - if (partition == null) { - partition = CommonNodeLabelsManager.NO_LABEL; - } - return partition; - } - - private String getRemoteNodePartition(RemoteNode node) { - String partition = node.getNodePartition(); - if (partition == null) { - partition = CommonNodeLabelsManager.NO_LABEL; - } - return partition; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java similarity index 99% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java index 548ddad6af7..06bd151c40b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestDistributedOpportunisticContainerAllocator.java @@ -56,12 +56,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class TestOpportunisticContainerAllocator { +public class TestDistributedOpportunisticContainerAllocator { private static final Logger LOG = - LoggerFactory.getLogger(TestOpportunisticContainerAllocator.class); + LoggerFactory.getLogger(TestDistributedOpportunisticContainerAllocator.class); private static final int GB = 1024; - private OpportunisticContainerAllocator allocator = null; + private DistributedOpportunisticContainerAllocator allocator = null; private OpportunisticContainerContext oppCntxt = null; private static final Priority PRIORITY_NORMAL = Priority.newInstance(1); private static final Resource CAPABILITY_1GB = @@ -98,7 +98,7 @@ public MasterKey getCurrentKey() { return new byte[]{1, 2}; } }; - allocator = new OpportunisticContainerAllocator(secMan); + allocator = new DistributedOpportunisticContainerAllocator(secMan); oppCntxt = new OpportunisticContainerContext(); oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1)); oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index db3aaca11fb..80d3e8f4175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.MultiStateTransitionListener; import org.slf4j.Logger; @@ -479,7 +480,7 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator( + new DistributedOpportunisticContainerAllocator( context.getContainerTokenSecretManager(), maxAllocationsPerAMHeartbeat)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index dee2a205fb3..e3766e9473a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; @@ -232,7 +233,7 @@ public void setBytes(ByteBuffer bytes) {} }; nmContainerTokenSecretManager.setMasterKey(mKey); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nmContainerTokenSecretManager); + new DistributedOpportunisticContainerAllocator(nmContainerTokenSecretManager); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a360ed2b652..30d9b2de884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.DistributedOpportunisticContainerAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -74,9 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; -import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; -import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; import java.net.InetSocketAddress; @@ -100,7 +100,7 @@ LoggerFactory.getLogger(OpportunisticContainerAllocatorAMService.class); private final NodeQueueLoadMonitor nodeMonitor; - private final OpportunisticContainerAllocator oppContainerAllocator; + private final CentralizedOpportunisticContainerAllocator oppContainerAllocator; private final int k; @@ -137,7 +137,7 @@ public void registerApplicationMaster( if (appAttempt.getOpportunisticContainerContext() == null) { OpportunisticContainerContext opCtx = new OpportunisticContainerContext(); - opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator + opCtx.setContainerIdGenerator(new DistributedOpportunisticContainerAllocator .ContainerIdGenerator() { @Override public long generateContainerId() { @@ -164,7 +164,7 @@ public void allocate(ApplicationAttemptId appAttemptId, AllocateRequest request, AllocateResponse response) throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. - OpportunisticContainerAllocator.PartitionedResourceRequests + CentralizedOpportunisticContainerAllocator.PartitionedResourceRequests partitionedAsks = oppContainerAllocator.partitionAskList(request.getAskList()); @@ -181,7 +181,6 @@ public void allocate(ApplicationAttemptId appAttemptId, OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); - oppCtx.updateNodeList(getLeastLoadedNodes()); if (!partitionedAsks.getOpportunistic().isEmpty()) { String appPartition = appAttempt.getAppAMNodePartitionName(); @@ -233,27 +232,23 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); - this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager(), - maxAllocationsPerAMHeartbeat); - this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, - YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); long nodeSortInterval = rmContext.getYarnConfiguration().getLong( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS); this.cacheRefreshInterval = nodeSortInterval; - this.lastCacheUpdateTime = System.currentTimeMillis(); NodeQueueLoadMonitor.LoadComparator comparator = NodeQueueLoadMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR)); - NodeQueueLoadMonitor topKSelector = new NodeQueueLoadMonitor(nodeSortInterval, comparator); + this.k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, + YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); + this.lastCacheUpdateTime = System.currentTimeMillis(); float sigma = rmContext.getYarnConfiguration() .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, @@ -285,6 +280,9 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); this.nodeMonitor = topKSelector; + this.oppContainerAllocator = new CentralizedOpportunisticContainerAllocator( + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat, nodeMonitor); } @Override @@ -483,12 +481,4 @@ private RemoteNode convertToRemoteNode(NodeId nodeId) { } return null; } - - private static ApplicationAttemptId getAppAttemptId() throws YarnException { - AMRMTokenIdentifier amrmTokenIdentifier = - YarnServerSecurityUtils.authorizeRequest(); - ApplicationAttemptId applicationAttemptId = - amrmTokenIdentifier.getApplicationAttemptId(); - return applicationAttemptId; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java new file mode 100644 index 00000000000..625700c50d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CentralizedOpportunisticContainerAllocator extends + OpportunisticContainerAllocator { + + private static final Logger LOG = + LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class); + + private NodeQueueLoadMonitor nodeQueueLoadMonitor; + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public CentralizedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat, + NodeQueueLoadMonitor nodeQueueLoadMonitor) { + super(tokenSecretManager, maxAllocationsPerAMHeartbeat); + this.nodeQueueLoadMonitor = nodeQueueLoadMonitor; + } + + @Override + public List allocateContainers( + ResourceBlacklistRequest blackList, List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext opportContext, long rmIdentifier, + String appSubmitter) throws YarnException { + + // Update black list. + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(oppResourceReqs); + Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + List allocatedContainers = new ArrayList<>(); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } + Map> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList, remAllocs); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + for (Map> allocation : allocations) { + for (Map.Entry> e : allocation.entrySet()) { + opportContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); + } + } + } + } + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, + ApplicationAttemptId appAttId, String userName, Set blackList, + int maxAllocations) + throws YarnException { + Map> allocations = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : + appContext.getOutstandingOpReqs().get(schedKey).values()) { + + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List allocs : allocations.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); + break; + } + } + + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), blackList, + appAttId, userName, allocations, enrichedAsk, + remainingAllocs); + ResourceRequest anyAsk = enrichedAsk.getRequest(); + if (!allocations.isEmpty()) { + LOG.info("Opportunistic allocation requested for [priority={}, " + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), + anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), + anyAsk.getCapability(), allocations.keySet()); + } + } + return allocations; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations, + EnrichedResourceRequest enrichedAsk, int maxAllocations) + throws YarnException { + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int toAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } + // allocate node local + List allocatedContainers = + allocateNodeLocal(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + toAllocate -= allocatedContainers.size(); + + // if still left, allocate rack local + if (toAllocate > 0) { + allocatedContainers = + allocateRackLocal(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + toAllocate -= allocatedContainers.size(); + } + + // if still left, try on ANY + if (toAllocate > 0) { + allocateAny(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + } + } + + private List allocateNodeLocal(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException{ + Set nodeLocations = enrichedAsk.getNodeLocations(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + List allocatedContainers = new ArrayList<>(); + while (toAllocate > 0) { + int numAllocated = 0; + for (String nodeLocation : nodeLocations) { + if (toAllocate <= 0) { + break; + } + RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation, + blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, nodeLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node), ""); + allocatedContainers.add(container); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + nodeLocation + "]"); + metrics.incrNodeLocalOppContainers(); + } + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private List allocateRackLocal(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException{ + Set rackLocations = enrichedAsk.getRackLocations(); + List allocatedContainers = new ArrayList<>(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + while (toAllocate > 0) { + int numAllocated = 0; + for (String rackLocation : rackLocations) { + if (toAllocate <= 0) { + break; + } + RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation, + blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, rackLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node), ""); + allocatedContainers.add(container); + metrics.incrRackLocalOppContainers(); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + rackLocation + "]"); + } + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private List allocateAny(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException { + List allocatedContainers = new ArrayList<>(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + while (toAllocate > 0) { + int numAllocated = 0; + RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, ResourceRequest.ANY, + enrichedAsk.getRequest(), convertToRemoteNode(node), ""); + allocatedContainers.add(container); + metrics.incrOffSwitchOppContainers(); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + ResourceRequest.ANY + "]"); + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private RemoteNode convertToRemoteNode(RMNode rmNode) { + if (rmNode != null) { + RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(), + rmNode.getHttpAddress()); + rNode.setRackName(rmNode.getRackName()); + return rNode; + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index e093b2d9974..23d083fc989 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -34,10 +35,14 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -48,7 +53,7 @@ */ public class NodeQueueLoadMonitor implements ClusterMonitor { - final static Logger LOG = LoggerFactory. + private final static Logger LOG = LoggerFactory. getLogger(NodeQueueLoadMonitor.class); /** @@ -68,14 +73,53 @@ public int compare(ClusterNode o1, ClusterNode o2) { } public int getMetric(ClusterNode c) { - return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime; + return (this == QUEUE_LENGTH) ? + c.queueLength.get() : c.queueWaitTime.get(); + } + + public int incrementMetricAndGet(ClusterNode c, int increaseSize) { + if(this == QUEUE_LENGTH) { + return c.queueLength.addAndGet(increaseSize); + } else { + throw new NotImplementedException( + "Incrementing queue wait time isn't supported"); + } + } + + public int decrementMetricAndGet(ClusterNode c, int decrementSize) { + if(this == QUEUE_LENGTH) { + return c.queueLength.addAndGet(-decrementSize); + } else { + throw new NotImplementedException( + "Decrementing queue wait time isn't supported"); + } + } + + /** + * Increment the metric by a delta if it is below the threshold. + * @param c {@link ClusterNode} + * @param incrementSize increment size + * @return true if the metric was below threshold and was incremented. + */ + public boolean compareAndIncrement(ClusterNode c, int incrementSize) { + if(this == QUEUE_LENGTH) { + int ret = c.queueLength.addAndGet(incrementSize); + if (ret <= c.queueCapacity) { + return true; + } + c.queueLength.addAndGet(-incrementSize); + return false; + } else { + throw new NotImplementedException( + "Incrementing queue wait time isn't supported"); + } } } static class ClusterNode { - int queueLength = 0; - int queueWaitTime = -1; - double timestamp; + AtomicInteger queueLength = new AtomicInteger(0); + AtomicInteger queueWaitTime = new AtomicInteger(-1); + long timestamp; final NodeId nodeId; private int queueCapacity = 0; @@ -85,12 +129,12 @@ public ClusterNode(NodeId nodeId) { } public ClusterNode setQueueLength(int qLength) { - this.queueLength = qLength; + this.queueLength.set(qLength); return this; } public ClusterNode setQueueWaitTime(int wTime) { - this.queueWaitTime = wTime; + this.queueWaitTime.set(wTime); return this; } @@ -106,7 +150,7 @@ public ClusterNode setQueueCapacity(int capacity) { public boolean isQueueFull() { return this.queueCapacity > 0 && - this.queueLength >= this.queueCapacity; + this.queueLength.get() >= this.queueCapacity; } } @@ -115,6 +159,10 @@ public boolean isQueueFull() { private final List sortedNodes; private final Map clusterNodes = new ConcurrentHashMap<>(); + private final Map nodeByHostName = + new ConcurrentHashMap<>(); + private final Map> nodeIdsByRack = + new ConcurrentHashMap<>(); private final LoadComparator comparator; private QueueLimitCalculator thresholdCalculator; private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock(); @@ -184,8 +232,8 @@ public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { @Override public void addNode(List containerStatuses, RMNode rmNode) { - LOG.debug("Node added event from: {}", rmNode.getNode().getName()); - + this.nodeByHostName.put(rmNode.getHostName(), rmNode); + addIntoNodeIdsByRack(rmNode); // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } @@ -193,6 +241,8 @@ public void addNode(List containerStatuses, @Override public void removeNode(RMNode removedRMNode) { LOG.debug("Node delete event for: {}", removedRMNode.getNode().getName()); + this.nodeByHostName.remove(removedRMNode.getHostName()); + removeFromNodeIdsByRack(removedRMNode); ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock(); writeLock.lock(); ClusterNode node; @@ -303,6 +353,71 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { } } + public RMNode selectLocalNode(String hostName, Set blacklist) { + if (blacklist.contains(hostName)) { + return null; + } + RMNode node = nodeByHostName.get(hostName); + if (node != null) { + ClusterNode clusterNode = clusterNodes.get(node.getNodeID()); + if (comparator.compareAndIncrement(clusterNode, 1)) { + return node; + } + } + return null; + } + + public RMNode selectRackLocalNode(String rackName, Set blacklist) { + Set nodesOnRack = nodeIdsByRack.get(rackName); + if (nodesOnRack != null) { + for (NodeId nodeId : nodesOnRack) { + if (!blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + } + return null; + } + + public RMNode selectAnyNode(Set blacklist) { + List nodeIds = selectLeastLoadedNodes(20); + int size = nodeIds.size(); + Random rand = new Random(); + int index = rand.nextInt(size); + for (int i = 0; i < size; ++i) { + index += i; + index %= size; + NodeId nodeId = nodeIds.get(index); + if (nodeId != null && !blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + return null; + } + + private void removeFromNodeIdsByRack(RMNode removedNode) { + Set nodesOnRack = nodeIdsByRack.get(removedNode.getRackName()); + if (nodesOnRack != null) { + nodesOnRack.remove(removedNode.getNodeID()); + } + } + + private void addIntoNodeIdsByRack(RMNode addedNode) { + ConcurrentSkipListSet value = new ConcurrentSkipListSet(); + String rackname = addedNode.getRackName(); + Set nodes = this.nodeIdsByRack.putIfAbsent(rackname, value); + if (null == nodes) { + nodes = value; + } + nodes.add(addedNode.getNodeID()); + } + private List sortNodes() { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index fe3a889af9d..2ad91e90454 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -215,6 +216,13 @@ public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { Collections.emptyList(), isHealthy, responseId); } + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy, + OpportunisticContainersStatus opportunisticContainersStatus) throws Exception { + return nodeHeartbeat(Collections.emptyList(), + Collections.emptyList(), isHealthy, responseId, + opportunisticContainersStatus); + } + public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, long containerId, ContainerState containerState) throws Exception { ContainerStatus containerStatus = BuilderUtils.newContainerStatus( @@ -251,11 +259,20 @@ public NodeHeartbeatResponse nodeHeartbeat( public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, List increasedConts, boolean isHealthy, int resId) + throws Exception { + return nodeHeartbeat(updatedStats, increasedConts, isHealthy, + resId, null); + } + + public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, + List increasedConts, boolean isHealthy, int resId, + OpportunisticContainersStatus oppContainerStatus) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); status.setNodeId(nodeId); + status.setOpportunisticContainersStatus(oppContainerStatus); ArrayList completedContainers = new ArrayList(); for (ContainerStatus stat : updatedStats) { if (stat.getState() == ContainerState.COMPLETE) { @@ -277,7 +294,6 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); - req.setRegisteringCollectors(this.registeringCollectors); req.setTokenSequenceNo(this.tokenSequenceNo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index bdd4f7155c1..cd6be46b6d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -197,10 +197,10 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm3.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm4.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); // Send add and update node events to AM Service. amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); @@ -212,10 +212,10 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception amservice.handle(new NodeUpdateSchedulerEvent(rmNode3)); amservice.handle(new NodeUpdateSchedulerEvent(rmNode4)); // All nodes 1 - 4 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm3.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm4.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100); @@ -253,7 +253,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null, ExecutionType.GUARANTEED))); // Node on same host should not result in allocation - sameHostDiffNode.nodeHeartbeat(true); + sameHostDiffNode.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); @@ -296,7 +296,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception .getUpdateContainerRequest().getContainerId()); // Ensure after correct node heartbeats, we should get the allocation - allocNode.nodeHeartbeat(true); + allocNode.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); @@ -310,10 +310,10 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception // Allocated cores+mem should have increased, available should decrease verifyMetrics(metrics, 14336, 14, 2048, 2, 2); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - nm3.nodeHeartbeat(true); - nm4.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm3.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm4.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); rm.drainEvents(); // Verify that the container is still in ACQUIRED state wrt the RM. @@ -363,13 +363,8 @@ public void testContainerPromoteAfterContainerStart() throws Exception { RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); @@ -380,8 +375,8 @@ public void testContainerPromoteAfterContainerStart() throws Exception { amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -489,13 +484,8 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); @@ -506,8 +496,8 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -592,6 +582,7 @@ public void testContainerAutoUpdateContainer() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); @@ -602,11 +593,6 @@ public void testContainerAutoUpdateContainer() throws Exception { ResourceScheduler scheduler = rm.getResourceScheduler(); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - nm1.nodeHeartbeat(true); - - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId) .getOpportunisticContainerContext(); @@ -614,7 +600,7 @@ public void testContainerAutoUpdateContainer() throws Exception { amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); - nm1.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100); @@ -662,7 +648,8 @@ public void testContainerAutoUpdateContainer() throws Exception { Assert.assertEquals(ExecutionType.GUARANTEED, uc.getContainer().getExecutionType()); // Check that the container is updated in NM through NM heartbeat response - NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true, + getOppurtunisticStatus(-1, 100)); Assert.assertEquals(1, response.getContainersToUpdate().size()); Container containersFromNM = response.getContainersToUpdate().get(0); Assert.assertEquals(container.getId(), containersFromNM.getId()); @@ -692,7 +679,7 @@ public void testContainerAutoUpdateContainer() throws Exception { // Check that the container resources are increased in // NM through NM heartbeat response if (response.getContainersToUpdate().size() == 0) { - response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); } Assert.assertEquals(1, response.getContainersToUpdate().size()); Assert.assertEquals(Resource.newInstance(2 * GB, 1), @@ -708,12 +695,13 @@ public void testContainerAutoUpdateContainer() throws Exception { // Check that the container resources are decreased // in NM through NM heartbeat response - response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true, + getOppurtunisticStatus(-1, 100)); Assert.assertEquals(1, response.getContainersToUpdate().size()); Assert.assertEquals(Resource.newInstance(1 * GB, 1), response.getContainersToUpdate().get(0).getResource()); - nm1.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); // DEMOTE the container allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList( UpdateContainerRequest.newInstance(3, container.getId(), @@ -735,7 +723,7 @@ public void testContainerAutoUpdateContainer() throws Exception { uc.getContainer().getExecutionType()); // Check that the container is updated in NM through NM heartbeat response if (response.getContainersToUpdate().size() == 0) { - response = nm1.nodeHeartbeat(true); + response = nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); } Assert.assertEquals(1, response.getContainersToUpdate().size()); Assert.assertEquals(ExecutionType.OPPORTUNISTIC, @@ -761,6 +749,10 @@ public void testOpportunisticSchedulerMetrics() throws Exception { nodes.put(nm2.getNodeId(), nm2); nm1.registerNode(); nm2.registerNode(); + + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + OpportunisticSchedulerMetrics metrics = OpportunisticSchedulerMetrics.getMetrics(); @@ -777,16 +769,9 @@ public void testOpportunisticSchedulerMetrics() throws Exception { app1.getCurrentAppAttempt().getAppAttemptId(); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); ResourceScheduler scheduler = rm.getResourceScheduler(); - RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); @@ -797,8 +782,8 @@ public void testOpportunisticSchedulerMetrics() throws Exception { amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); // All nodes 1 to 2 will be applicable for scheduling. - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); GenericTestUtils.waitFor(() -> amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100); @@ -890,6 +875,8 @@ public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); nm2.registerNode(); + nm1.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); + nm2.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); @@ -900,12 +887,6 @@ public void testNodeRemovalDuringAllocate() throws Exception { ResourceScheduler scheduler = rm.getResourceScheduler(); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); - nm1.nodeHeartbeat(true); - nm2.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); - ((RMNodeImpl) rmNode2) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); // Send add and update node events to AM Service. @@ -919,12 +900,12 @@ public void testNodeRemovalDuringAllocate() throws Exception { Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1 * GB), 2)), null); - if (ctxt.getNodeMap().size() == 2) { + if (amservice.getLeastLoadedNodes().size() == 2) { break; } Thread.sleep(50); } - Assert.assertEquals(2, ctxt.getNodeMap().size()); + Assert.assertEquals(2, amservice.getLeastLoadedNodes().size()); // Remove node from scheduler but not from AM Service. scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1)); // After removal of node 1, only 1 node will be applicable for scheduling. @@ -937,18 +918,19 @@ public void testNodeRemovalDuringAllocate() throws Exception { } catch (Exception e) { Assert.fail("Allocate request should be handled on node removal"); } - if (ctxt.getNodeMap().size() == 1) { + if (amservice.getLeastLoadedNodes().size() == 1) { break; } Thread.sleep(50); } - Assert.assertEquals(1, ctxt.getNodeMap().size()); + Assert.assertEquals(1, amservice.getLeastLoadedNodes().size()); } @Test(timeout = 60000) public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService()); nm.registerNode(); + nm.nodeHeartbeat(true, getOppurtunisticStatus(-1, 100)); OpportunisticContainerAllocatorAMService amservice = (OpportunisticContainerAllocatorAMService) rm .getApplicationMasterService(); @@ -960,9 +942,7 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { SchedulerApplicationAttempt schedulerAttempt = ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId); RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId()); - nm.nodeHeartbeat(true); - ((RMNodeImpl) rmNode1) - .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + // Send add and update node events to AM Service. amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); @@ -1002,11 +982,11 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception { private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime, int queueLength) { - OpportunisticContainersStatus status1 = - Mockito.mock(OpportunisticContainersStatus.class); - Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime); - Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength); - return status1; + OpportunisticContainersStatus status = OpportunisticContainersStatus.newInstance(); + status.setEstimatedQueueWaitTime(waitTime); + status.setWaitQueueLength(queueLength); + status.setOpportQueueCapacity(1000); + return status; } // Test if the OpportunisticContainerAllocatorAMService can handle both diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index bbc0086c375..29b3986ceee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -27,7 +27,9 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Unit tests for NodeQueueLoadMonitor. @@ -228,6 +230,114 @@ public void testContainerQueuingLimit() { } + /** + * Tests select local node. + * @throws Exception + */ + @Test + public void testSelectLocalNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectLocalNode("h1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectLocalNode("h1", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h2", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h3", blacklist); + Assert.assertEquals("h3", node.getHostName()); + } + + @Test + public void testSelectRackLocalNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertNull(node); + + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertNull(node); + } + + @Test + public void testSelectAnyNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + selector.computeTask.run(); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectAnyNode(blacklist); + Assert.assertTrue(node.getHostName().equals("h1") || + node.getHostName().equals("h3")); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectAnyNode(blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectAnyNode(blacklist); + Assert.assertNull(node); + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { return createRMNode(host, port, waitTime, queueLength, @@ -236,20 +346,28 @@ private RMNode createRMNode(String host, int port, private RMNode createRMNode(String host, int port, int waitTime, int queueLength, NodeState state) { - return createRMNode(host, port, waitTime, queueLength, + return createRMNode(host, port, "default", waitTime, queueLength, DEFAULT_MAX_QUEUE_LENGTH, state); } private RMNode createRMNode(String host, int port, int waitTime, int queueLength, int queueCapacity) { - return createRMNode(host, port, waitTime, queueLength, queueCapacity, + return createRMNode(host, port, "default", waitTime, queueLength, + queueCapacity, NodeState.RUNNING); + } + + private RMNode createRMNode(String host, int port, String rack, + int waitTime, int queueLength, int queueCapacity) { + return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity, NodeState.RUNNING); } - private RMNode createRMNode(String host, int port, + private RMNode createRMNode(String host, int port, String rack, int waitTime, int queueLength, int queueCapacity, NodeState state) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getHostName()).thenReturn(host); + Mockito.when(node1.getRackName()).thenReturn(rack); Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getState()).thenReturn(state); OpportunisticContainersStatus status1 =