diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 0ce19763803..48473a617ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -72,7 +72,7 @@ private static final int RACK_LOCAL_LOOP = 1; private static final int OFF_SWITCH_LOOP = 2; - private int maxAllocationsPerAMHeartbeat = -1; + protected int maxAllocationsPerAMHeartbeat = -1; /** * This class encapsulates application specific parameters used to build a @@ -220,7 +220,7 @@ public long generateContainerId() { private final BaseContainerTokenSecretManager tokenSecretManager; - static class Allocation { + protected static class Allocation { private final Container container; private final String resourceName; @@ -229,16 +229,16 @@ public long generateContainerId() { this.resourceName = resourceName; } - Container getContainer() { + public Container getContainer() { return container; } - String getResourceName() { + public String getResourceName() { return resourceName; } } - static class EnrichedResourceRequest { + protected static class EnrichedResourceRequest { private final Map nodeLocations = new HashMap<>(); private final Map rackLocations = new HashMap<>(); private final ResourceRequest request; @@ -253,7 +253,7 @@ long getTimestamp() { return timestamp; } - ResourceRequest getRequest() { + public ResourceRequest getRequest() { return request; } @@ -284,11 +284,11 @@ void removeLocation(String location) { } } - Set getNodeLocations() { + public Set getNodeLocations() { return nodeLocations.keySet(); } - Set getRackLocations() { + public Set getRackLocations() { return rackLocations.keySet(); } } @@ -395,7 +395,7 @@ void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { return allocatedContainers; } - private int getTotalAllocations( + protected int getTotalAllocations( List>> allocations) { int totalAllocs = 0; for (Map> allocation : allocations) { @@ -423,7 +423,7 @@ private int getTotalAllocations( remainingAllocs = maxAllocations - totalAllocated; if (remainingAllocs <= 0) { LOG.info("Not allocating more containers as max allocations per AM " - + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); + + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); break; } } @@ -434,8 +434,8 @@ private int getTotalAllocations( ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " - + "allocationRequestId={}, num_containers={}, capability={}] " - + "allocated = {}", anyAsk.getPriority(), + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), anyAsk.getCapability(), containers.keySet()); } @@ -458,7 +458,7 @@ private void allocateContainersInternal(long rmIdentifier, ResourceRequest anyAsk = enrichedAsk.getRequest(); int toAllocate = anyAsk.getNumContainers() - (allocations.isEmpty() ? 0 : - allocations.get(anyAsk.getCapability()).size()); + allocations.get(anyAsk.getCapability()).size()); toAllocate = Math.min(toAllocate, appParams.getMaxAllocationsPerSchedulerKeyPerRound()); if (maxAllocations >= 0) { @@ -622,7 +622,7 @@ private int collectNodeLocalCandidates(Map allNodes, return numContainers; } - private Container createContainer(long rmIdentifier, + protected Container createContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ApplicationAttemptId id, String userName, Map> allocations, String location, @@ -654,7 +654,7 @@ private Container buildContainer(long rmIdentifier, SchedulerRequestKey.create(rr), userName, node, cId, capability); } - private Container createContainer(long rmIdentifier, long tokenExpiry, + protected Container createContainer(long rmIdentifier, long tokenExpiry, SchedulerRequestKey schedulerKey, String userName, RemoteNode node, ContainerId cId, Resource capability) { long currTime = System.currentTimeMillis(); @@ -733,4 +733,4 @@ private String getRemoteNodePartition(RemoteNode node) { } return partition; } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a360ed2b652..3c8d09bde1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.CentralizedOpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -37,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; @@ -74,9 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; -import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; -import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; import java.net.InetSocketAddress; @@ -100,7 +99,7 @@ LoggerFactory.getLogger(OpportunisticContainerAllocatorAMService.class); private final NodeQueueLoadMonitor nodeMonitor; - private final OpportunisticContainerAllocator oppContainerAllocator; + private final CentralizedOpportunisticContainerAllocator oppContainerAllocator; private final int k; @@ -164,7 +163,7 @@ public void allocate(ApplicationAttemptId appAttemptId, AllocateRequest request, AllocateResponse response) throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. - OpportunisticContainerAllocator.PartitionedResourceRequests + CentralizedOpportunisticContainerAllocator.PartitionedResourceRequests partitionedAsks = oppContainerAllocator.partitionAskList(request.getAskList()); @@ -181,7 +180,6 @@ public void allocate(ApplicationAttemptId appAttemptId, OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); - oppCtx.updateNodeList(getLeastLoadedNodes()); if (!partitionedAsks.getOpportunistic().isEmpty()) { String appPartition = appAttempt.getAppAMNodePartitionName(); @@ -233,27 +231,23 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, YarnConfiguration. DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); - this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager(), - maxAllocationsPerAMHeartbeat); - this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, - YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); long nodeSortInterval = rmContext.getYarnConfiguration().getLong( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS); this.cacheRefreshInterval = nodeSortInterval; - this.lastCacheUpdateTime = System.currentTimeMillis(); NodeQueueLoadMonitor.LoadComparator comparator = NodeQueueLoadMonitor.LoadComparator.valueOf( rmContext.getYarnConfiguration().get( YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, YarnConfiguration. DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR)); - NodeQueueLoadMonitor topKSelector = new NodeQueueLoadMonitor(nodeSortInterval, comparator); + this.k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, + YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); + this.lastCacheUpdateTime = System.currentTimeMillis(); float sigma = rmContext.getYarnConfiguration() .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, @@ -285,6 +279,9 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); this.nodeMonitor = topKSelector; + this.oppContainerAllocator = new CentralizedOpportunisticContainerAllocator( + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat, nodeMonitor); } @Override @@ -483,12 +480,4 @@ private RemoteNode convertToRemoteNode(NodeId nodeId) { } return null; } - - private static ApplicationAttemptId getAppAttemptId() throws YarnException { - AMRMTokenIdentifier amrmTokenIdentifier = - YarnServerSecurityUtils.authorizeRequest(); - ApplicationAttemptId applicationAttemptId = - amrmTokenIdentifier.getApplicationAttemptId(); - return applicationAttemptId; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java new file mode 100644 index 00000000000..efd8b3817a8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/CentralizedOpportunisticContainerAllocator.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class CentralizedOpportunisticContainerAllocator extends + OpportunisticContainerAllocator { + + private static final Logger LOG = + LoggerFactory.getLogger(CentralizedOpportunisticContainerAllocator.class); + + private NodeQueueLoadMonitor nodeQueueLoadMonitor; + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public CentralizedOpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat, + NodeQueueLoadMonitor nodeQueueLoadMonitor) { + super(tokenSecretManager, maxAllocationsPerAMHeartbeat); + this.nodeQueueLoadMonitor = nodeQueueLoadMonitor; + } + + @Override + public List allocateContainers( + ResourceBlacklistRequest blackList, List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext opportContext, long rmIdentifier, + String appSubmitter) throws YarnException { + + // Update black list. + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(oppResourceReqs); + Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + List allocatedContainers = new ArrayList<>(); + + // Satisfy the outstanding OPPORTUNISTIC requests. + boolean continueLoop = true; + while (continueLoop) { + continueLoop = false; + List>> allocations = new ArrayList<>(); + for (SchedulerRequestKey schedulerKey : + opportContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given cap (the actual container size + // might be different than what is requested, which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + LOG.info("Not allocating more containers as we have reached max " + + "allocations per AM heartbeat {}", + maxAllocationsPerAMHeartbeat); + break; + } + } + Map> allocation = allocate( + rmIdentifier, opportContext, schedulerKey, applicationAttemptId, + appSubmitter, nodeBlackList, remAllocs); + if (allocation.size() > 0) { + allocations.add(allocation); + continueLoop = true; + } + } + for (Map> allocation : allocations) { + for (Map.Entry> e : allocation.entrySet()) { + opportContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + for (Allocation alloc : e.getValue()) { + allocatedContainers.add(alloc.getContainer()); + } + } + } + } + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, + ApplicationAttemptId appAttId, String userName, Set blackList, + int maxAllocations) + throws YarnException { + Map> allocations = new HashMap<>(); + for (EnrichedResourceRequest enrichedAsk : + appContext.getOutstandingOpReqs().get(schedKey).values()) { + + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List allocs : allocations.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + LOG.info("Not allocating more containers as max allocations per AM " + + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat); + break; + } + } + + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), blackList, + appAttId, userName, allocations, enrichedAsk, + remainingAllocs); + ResourceRequest anyAsk = enrichedAsk.getRequest(); + if (!allocations.isEmpty()) { + LOG.info("Opportunistic allocation requested for [priority={}, " + + "allocationRequestId={}, num_containers={}, capability={}] " + + "allocated = {}", anyAsk.getPriority(), + anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), + anyAsk.getCapability(), allocations.keySet()); + } + } + return allocations; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations, + EnrichedResourceRequest enrichedAsk, int maxAllocations) + throws YarnException { + ResourceRequest anyAsk = enrichedAsk.getRequest(); + int toAllocate = anyAsk.getNumContainers() + - (allocations.isEmpty() ? 0 : + allocations.get(anyAsk.getCapability()).size()); + toAllocate = Math.min(toAllocate, + appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } + // allocate node local + List allocatedContainers = + allocateNodeLocal(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + toAllocate -= allocatedContainers.size(); + + // if still left, allocate rack local + if (toAllocate > 0) { + allocatedContainers = + allocateRackLocal(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + toAllocate -= allocatedContainers.size(); + } + + // if still left, try on ANY + if (toAllocate > 0) { + allocateAny(enrichedAsk, toAllocate, rmIdentifier, appParams, + idCounter, blacklist, id, userName, allocations); + } + } + + private List allocateNodeLocal(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException{ + Set nodeLocations = enrichedAsk.getNodeLocations(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + List allocatedContainers = new ArrayList<>(); + while (toAllocate > 0) { + int numAllocated = 0; + for (String nodeLocation : nodeLocations) { + if (toAllocate <= 0) { + break; + } + RMNode node = nodeQueueLoadMonitor.selectLocalNode(nodeLocation, + blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, nodeLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + nodeLocation + "]"); + metrics.incrNodeLocalOppContainers(); + } + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private List allocateRackLocal(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException{ + Set rackLocations = enrichedAsk.getRackLocations(); + List allocatedContainers = new ArrayList<>(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + while (toAllocate > 0) { + int numAllocated = 0; + for (String rackLocation : rackLocations) { + if (toAllocate <= 0) { + break; + } + RMNode node = nodeQueueLoadMonitor.selectRackLocalNode(rackLocation, + blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, rackLocation, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + metrics.incrRackLocalOppContainers(); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + rackLocation + "]"); + } + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private List allocateAny(EnrichedResourceRequest enrichedAsk, + int toAllocate, long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, + ApplicationAttemptId id, + String userName, Map> allocations) + throws YarnException { + List allocatedContainers = new ArrayList<>(); + OpportunisticSchedulerMetrics metrics = + OpportunisticSchedulerMetrics.getMetrics(); + while (toAllocate > 0) { + int numAllocated = 0; + RMNode node = nodeQueueLoadMonitor.selectAnyNode(blacklist); + if (node != null) { + ++numAllocated; + toAllocate--; + Container container = createContainer(rmIdentifier, appParams, + idCounter, id, userName, allocations, ResourceRequest.ANY, + enrichedAsk.getRequest(), convertToRemoteNode(node)); + allocatedContainers.add(container); + metrics.incrOffSwitchOppContainers(); + LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + + "location [" + ResourceRequest.ANY + "]"); + } + // we couldn't allocate any - break the loop. + if (numAllocated == 0) { + break; + } + } + return allocatedContainers; + } + + private RemoteNode convertToRemoteNode(RMNode rmNode) { + if (rmNode != null) { + RemoteNode rNode = RemoteNode.newInstance(rmNode.getNodeID(), + rmNode.getHttpAddress()); + rNode.setRackName(rmNode.getRackName()); + return rNode; + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index e093b2d9974..4f7faf5360d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -34,10 +35,14 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -48,7 +53,7 @@ */ public class NodeQueueLoadMonitor implements ClusterMonitor { - final static Logger LOG = LoggerFactory. + private final static Logger LOG = LoggerFactory. getLogger(NodeQueueLoadMonitor.class); /** @@ -68,14 +73,53 @@ public int compare(ClusterNode o1, ClusterNode o2) { } public int getMetric(ClusterNode c) { - return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime; + return (this == QUEUE_LENGTH) ? + c.queueLength.get() : c.queueWaitTime.get(); + } + + public int incrementMetricAndGet(ClusterNode c, int increaseSize) { + if(this == QUEUE_LENGTH) { + return c.queueLength.addAndGet(increaseSize); + } else { + throw new NotImplementedException( + "Incrementing queue wait time isn't supported"); + } + } + + public int decrementMetricAndGet(ClusterNode c, int decrementSize) { + if(this == QUEUE_LENGTH) { + return c.queueLength.addAndGet(-decrementSize); + } else { + throw new NotImplementedException( + "Decrementing queue wait time isn't supported"); + } + } + + /** + * Increment the metric by a delta if it is below the threshold. + * @param c {@link ClusterNode} + * @param incrementSize increment size + * @return true if the metric was below threshold and was incremented. + */ + public boolean compareAndIncrement(ClusterNode c, int incrementSize) { + if(this == QUEUE_LENGTH) { + int ret = c.queueLength.addAndGet(incrementSize); + if (ret <= c.queueCapacity) { + return true; + } + c.queueLength.addAndGet(-incrementSize); + return false; + } else { + throw new NotImplementedException( + "Incrementing queue wait time isn't supported"); + } } } static class ClusterNode { - int queueLength = 0; - int queueWaitTime = -1; - double timestamp; + AtomicInteger queueLength = new AtomicInteger(0); + AtomicInteger queueWaitTime = new AtomicInteger(-1); + long timestamp; final NodeId nodeId; private int queueCapacity = 0; @@ -85,12 +129,12 @@ public ClusterNode(NodeId nodeId) { } public ClusterNode setQueueLength(int qLength) { - this.queueLength = qLength; + this.queueLength.set(qLength); return this; } public ClusterNode setQueueWaitTime(int wTime) { - this.queueWaitTime = wTime; + this.queueWaitTime.set(wTime); return this; } @@ -106,7 +150,7 @@ public ClusterNode setQueueCapacity(int capacity) { public boolean isQueueFull() { return this.queueCapacity > 0 && - this.queueLength >= this.queueCapacity; + this.queueLength.get() >= this.queueCapacity; } } @@ -115,6 +159,10 @@ public boolean isQueueFull() { private final List sortedNodes; private final Map clusterNodes = new ConcurrentHashMap<>(); + private final Map nodeByHostName = + new ConcurrentHashMap<>(); + private final Map> nodeIdsByRack = + new ConcurrentHashMap<>(); private final LoadComparator comparator; private QueueLimitCalculator thresholdCalculator; private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock(); @@ -184,15 +232,17 @@ public void initThresholdCalculator(float sigma, int limitMin, int limitMax) { @Override public void addNode(List containerStatuses, RMNode rmNode) { - LOG.debug("Node added event from: {}", rmNode.getNode().getName()); - + this.nodeByHostName.put(rmNode.getHostName(), rmNode); + addIntoNodeIdsByRack(rmNode); // Ignoring this currently : at least one NODE_UPDATE heartbeat is // required to ensure node eligibility. } @Override public void removeNode(RMNode removedRMNode) { - LOG.debug("Node delete event for: {}", removedRMNode.getNode().getName()); + LOG.info("Node delete event for: {}", removedRMNode.getNode().getName()); + this.nodeByHostName.remove(removedRMNode.getHostName()); + removeFromNodeIdsByRack(removedRMNode); ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock(); writeLock.lock(); ClusterNode node; @@ -303,6 +353,71 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { } } + public RMNode selectLocalNode(String hostName, Set blacklist) { + if (blacklist.contains(hostName)) { + return null; + } + RMNode node = nodeByHostName.get(hostName); + if (node != null) { + ClusterNode clusterNode = clusterNodes.get(node.getNodeID()); + if (comparator.compareAndIncrement(clusterNode, 1)) { + return node; + } + } + return null; + } + + public RMNode selectRackLocalNode(String rackName, Set blacklist) { + Set nodesOnRack = nodeIdsByRack.get(rackName); + if (nodesOnRack != null) { + for (NodeId nodeId : nodesOnRack) { + if (!blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + } + return null; + } + + public RMNode selectAnyNode(Set blacklist) { + List nodeIds = selectLeastLoadedNodes(20); + int size = nodeIds.size(); + Random rand = new Random(); + int index = rand.nextInt(size); + for (int i = 0; i < size; ++i) { + index += i; + index %= size; + NodeId nodeId = nodeIds.get(index); + if (nodeId != null && !blacklist.contains(nodeId.getHost())) { + ClusterNode node = clusterNodes.get(nodeId); + if (node != null && comparator.compareAndIncrement(node, 1)) { + return nodeByHostName.get(nodeId.getHost()); + } + } + } + return null; + } + + private void removeFromNodeIdsByRack(RMNode removedNode) { + Set nodesOnRack = nodeIdsByRack.get(removedNode.getRackName()); + if (nodesOnRack != null) { + nodesOnRack.remove(removedNode.getNodeID()); + } + } + + private void addIntoNodeIdsByRack(RMNode addedNode) { + ConcurrentSkipListSet value = new ConcurrentSkipListSet(); + String rackname = addedNode.getRackName(); + Set nodes = this.nodeIdsByRack.putIfAbsent(rackname, value); + if (null == nodes) { + nodes = value; + } + nodes.add(addedNode.getNodeID()); + } + private List sortNodes() { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 3543bc4707e..f8ac82e1e6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -333,7 +333,6 @@ public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); - req.setRegisteringCollectors(this.registeringCollectors); req.setTokenSequenceNo(this.tokenSequenceNo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index bbc0086c375..81cccb89a60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -27,7 +27,9 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Unit tests for NodeQueueLoadMonitor. @@ -228,6 +230,116 @@ public void testContainerQueuingLimit() { } + /** + * Tests select local node. + * @throws Exception + */ + @Test + public void testSelectLocalNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectLocalNode("h1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectLocalNode("h1", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h2", blacklist); + Assert.assertNull(node); + + node = selector.selectLocalNode("h3", blacklist); + Assert.assertEquals("h3", node.getHostName()); + } + + @Test + public void testSelectRackLocalNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertEquals("h1", node.getHostName()); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectRackLocalNode("rack1", blacklist); + Assert.assertNull(node); + + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectRackLocalNode("rack2", blacklist); + Assert.assertNull(node); + } + + @Test + public void testSelectAnyNode() throws Exception { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + + RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5); + RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5); + RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 10); + + selector.addNode(null, h1); + selector.addNode(null, h2); + selector.addNode(null, h3); + + selector.updateNode(h1); + selector.updateNode(h2); + selector.updateNode(h3); + + selector.computeTask.run(); + + Assert.assertEquals(2, selector.getSortedNodes().size()); + + // basic test for selecting node which has queue length less than queue capacity. + Set blacklist = new HashSet<>(); + RMNode node = selector.selectAnyNode(blacklist); + Assert.assertTrue(node.getHostName().equals("h1") || + node.getHostName().equals("h3")); + + // if node has been added to blacklist + blacklist.add("h1"); + node = selector.selectAnyNode(blacklist); + Assert.assertEquals("h3", node.getHostName()); + + blacklist.add("h3"); + node = selector.selectAnyNode(blacklist); + Assert.assertNull(node); + } + private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { return createRMNode(host, port, waitTime, queueLength, @@ -236,20 +348,28 @@ private RMNode createRMNode(String host, int port, private RMNode createRMNode(String host, int port, int waitTime, int queueLength, NodeState state) { - return createRMNode(host, port, waitTime, queueLength, + return createRMNode(host, port, "default", waitTime, queueLength, DEFAULT_MAX_QUEUE_LENGTH, state); } private RMNode createRMNode(String host, int port, int waitTime, int queueLength, int queueCapacity) { - return createRMNode(host, port, waitTime, queueLength, queueCapacity, + return createRMNode(host, port, "default", waitTime, queueLength, + queueCapacity, NodeState.RUNNING); + } + + private RMNode createRMNode(String host, int port, String rack, + int waitTime, int queueLength, int queueCapacity) { + return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity, NodeState.RUNNING); } - private RMNode createRMNode(String host, int port, + private RMNode createRMNode(String host, int port, String rack, int waitTime, int queueLength, int queueCapacity, NodeState state) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getHostName()).thenReturn(host); + Mockito.when(node1.getRackName()).thenReturn(rack); Mockito.when(node1.getNodeID()).thenReturn(nID1); Mockito.when(node1.getState()).thenReturn(state); OpportunisticContainersStatus status1 =