diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index dfe85f2..f8f2651 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -91,7 +91,7 @@ public void doTest(int numMappers, int numReducers, int numNodes, Configuration conf = new Configuration(); // Start the mini-MR and mini-DFS clusters conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); dfsCluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numNodes).build(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6c921cd..5788e0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -306,55 +306,60 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "distributed-scheduling.enabled"; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; - /** Minimum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; - - /** Minimum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.min-container-vcores"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; - - /** Maximum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MAX_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; - public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; - - /** Maximum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.max-container-vcores"; - public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4; - - /** Incremental memory (in MB) used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = + /** Setting that controls whether opportunistic container allocation + * is enabled or not. */ + public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = + YARN_PREFIX + "opportunistic-container-alloation.enabled"; + public static final boolean + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false; + + /** Minimum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.min-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512; + + /** Minimum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES = + YARN_PREFIX + "opportunistic-containers.min-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1; + + /** Maximum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.max-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048; + + /** Maximum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES = + YARN_PREFIX + "opportunistic-containers.max-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4; + + /** Incremental memory (in MB) used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.incr-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT = 512; - /** Incremental virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.incr-vcores"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; - - /** Container token expiry for container allocated via distributed - * scheduling. */ - public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = - YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; - public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + /** Incremental virtual CPU cores used for allocating an opportunistic + * container. */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES = + YARN_PREFIX + "opportunistic-containers.incr-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1; + + /** Container token expiry for opportunistic containers. */ + public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS = + YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms"; + public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT = 600000; - /** Number of nodes to be used by the LocalScheduler of a NodeManager for - * dispatching containers during distributed scheduling. */ - public static final String DIST_SCHEDULING_NODES_NUMBER_USED = - YARN_PREFIX + "distributed-scheduling.nodes-used"; - public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; + /** Number of nodes to be used by the Opportunistic Container allocator for + * dispatching containers during container allocation. */ + public static final String OPPORTUNISTIC_SCHEDULING_NODES_NUMBER_USED = + YARN_PREFIX + "opportunistic-scheduling.nodes-used"; + public static final int OPPORTUNISTIC_SCHEDULING_NODES_NUMBER_USED_DEFAULT = + 10; /** Frequency for computing least loaded NMs. */ public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = @@ -2829,6 +2834,18 @@ public static String getClusterId(Configuration conf) { return clusterId; } + public static boolean isDistSchedulingEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + } + + public static boolean isOpportunisticContainerAllocationEnabled( + Configuration conf) { + return conf.getBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT); + } + // helper methods for timeline service configuration /** * Returns whether the timeline service is enabled via configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 71321e3..acc6cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -84,7 +84,7 @@ * specifying OPPORTUNISTIC containers in its resource requests, * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the DistributedSchedulingAMService running on the RM. + * to the OpportunisticContainerAllocatingAMService running on the RM. */ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { @@ -105,7 +105,7 @@ public void doBefore() throws Exception { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); cluster.init(conf); cluster.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java new file mode 100644 index 0000000..d9c1e14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; + +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + *

+ * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as possible. + *

+ */ +public class OpportunisticContainerAllocator { + + public static class AllocationParams { + private Resource maxResource; + private Resource minResource; + private Resource incrementResource; + private int containerTokenExpiryInterval; + + public Resource getMaxResource() { + return maxResource; + } + + public void setMaxResource(Resource maxResource) { + this.maxResource = maxResource; + } + + public Resource getMinResource() { + return minResource; + } + + public void setMinResource(Resource minResource) { + this.minResource = minResource; + } + + public Resource getIncrementResource() { + return incrementResource; + } + + public void setIncrementResource(Resource incrementResource) { + this.incrementResource = incrementResource; + } + + public int getContainerTokenExpiryInterval() { + return containerTokenExpiryInterval; + } + + public void setContainerTokenExpiryInterval( + int containerTokenExpiryInterval) { + this.containerTokenExpiryInterval = containerTokenExpiryInterval; + } + } + + static class PartitionedResourceRequests { + private List guaranteed = new ArrayList<>(); + private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { + return guaranteed; + } + public List getOpportunistic() { + return opportunistic; + } + } + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocator.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + public static class ContainerIdGenerator { + protected final AtomicLong containerIdCounter = new AtomicLong(1); + + public void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart); + } + + public long generateContainerId() { + return this.containerIdCounter.incrementAndGet(); + } + } + + private final BaseContainerTokenSecretManager tokenSecretManager; + private int webpagePort; + + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) { + this.tokenSecretManager = tokenSecretManager; + this.webpagePort = webpagePort; + } + + public List allocateContainers( + AllocateRequest request, ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext appContext, long rmIdentifier, + String appSubmitter) throws YarnException { + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = + partitionAskList(request.getAskList()); + + List releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + appContext.getContainersAllocated().removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + if (rbr != null) { + appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List allocatedContainers = new ArrayList<>(); + for (Priority priority : + appContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map> allocated = allocate(rmIdentifier, + appContext, priority,applicationAttemptId, appSubmitter); + for (Map.Entry> e : allocated.entrySet()) { + appContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + + // Send all the GUARANTEED Reqs to RM + request.setAskList(partitionedAsks.getGuaranteed()); + return allocatedContainers; + } + + public Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, Priority priority, + ApplicationAttemptId appAttId,String userName) throws YarnException { + Map> containers = new HashMap<>(); + for (ResourceRequest anyAsk : + appContext.getOutstandingOpReqs().get(priority).values()) { + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), appContext.getBlacklist(), + appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, ApplicationAttemptId id, + Map allNodes, String userName, + Map> containers, ResourceRequest anyAsk) + throws YarnException { + int toAllocate = anyAsk.getNumContainers() + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); + + List nodesForScheduling = new ArrayList<>(); + for (Entry nodeEntry : allNodes.entrySet()) { + // Do not use blacklisted nodes for scheduling. + if (blacklist.contains(nodeEntry.getKey())) { + continue; + } + nodesForScheduling.add(nodeEntry.getValue()); + } + int numAllocated = 0; + int nextNodeToSchedule = 0; + for (int numCont = 0; numCont < toAllocate; numCont++) { + nextNodeToSchedule++; + nextNodeToSchedule %= nodesForScheduling.size(); + NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); + Container container = buildContainer(rmIdentifier, appParams, idCounter, + anyAsk, id, userName, nodeId); + List cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); + } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); + } + + private Container buildContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ResourceRequest rr, ApplicationAttemptId id, String userName, + NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + + // Normalize the resource asks (Similar to what the the RM scheduler does + // before accepting an ask) + Resource capability = normalizeCapability(appParams, rr); + + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, + tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, + rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.OPPORTUNISTIC); + byte[] pwd = + tokenSecretManager.createPassword(containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType(), + rr.getAllocationRequestId()); + return container; + } + + private Resource normalizeCapability(AllocationParams appParams, + ResourceRequest ask) { + return Resources.normalize(RESOURCE_CALCULATOR, + ask.getCapability(), appParams.minResource, appParams.maxResource, + appParams.incrementResource); + } + + private static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + private PartitionedResourceRequests partitionAskList(List + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java new file mode 100644 index 0000000..33946b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; + +public class OpportunisticContainerContext { + + private static final Logger LOG = LoggerFactory + .getLogger(OpportunisticContainerContext.class); + + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. + private Set containersAllocated = new HashSet<>(); + private AllocationParams appParams = + new AllocationParams(); + private ContainerIdGenerator containerIdGenerator = + new ContainerIdGenerator(); + + private Map nodeMap = new LinkedHashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map nodeTokens = new HashMap<>(); + final Set blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequest (ask). + final TreeMap> + outstandingOpReqs = new TreeMap<>(); + + public Set getContainersAllocated() { + return containersAllocated; + } + + public OpportunisticContainerAllocator.AllocationParams getAppParams() { + return appParams; + } + + public ContainerIdGenerator getContainerIdGenerator() { + return containerIdGenerator; + } + + public void setContainerIdGenerator( + ContainerIdGenerator containerIdGenerator) { + this.containerIdGenerator = containerIdGenerator; + } + + public Map getNodeMap() { + return nodeMap; + } + + public Map getNodeTokens() { + return nodeTokens; + } + + public Set getBlacklist() { + return blacklist; + } + + public TreeMap> + getOutstandingOpReqs() { + return outstandingOpReqs; + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds to the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability. + * + * @param resourceAsks the list with the {@link ResourceRequest}s + */ + public void addToOutstandingReqs(List resourceAsks) { + for (ResourceRequest request : resourceAsks) { + Priority priority = request.getPriority(); + + // TODO: Extend for Node/Rack locality. We only handle ANY requests now + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map reqMap = + outstandingOpReqs.get(priority); + if (reqMap == null) { + reqMap = new HashMap<>(); + outstandingOpReqs.put(priority, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest. + */ + public void matchAllocationToOutstandingRequest(Resource capability, + List allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map asks = + outstandingOpReqs.get(c.getPriority()); + + if (asks == null) + continue; + + ResourceRequest rr = asks.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) { + asks.remove(capability); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 131eaa3..88bc29c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5bfbb8d..d624ef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -72,7 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -336,7 +336,7 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeHealthChecker); boolean isDistSchedulingEnabled = - conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + conf.getBoolean(YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, @@ -370,8 +370,8 @@ protected void serviceInit(Configuration conf) throws Exception { ((NMContext) context).setWebServer(webServer); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator(nodeStatusUpdater, context, - webServer.getPort())); + new OpportunisticContainerAllocator( + context.getContainerTokenSecretManager(), webServer.getPort())); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 75fe022..efbdfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -62,7 +62,7 @@ AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory .getLogger(DefaultRequestInterceptor.class); - private DistributedSchedulingAMProtocol rmClient; + private ApplicationMasterProtocol rmClient; private UserGroupInformation user = null; @Override @@ -76,15 +76,7 @@ public void init(AMRMProxyApplicationContext appContext) { user.addToken(appContext.getAMRMToken()); final Configuration conf = this.getConf(); - rmClient = user.doAs( - new PrivilegedExceptionAction() { - @Override - public DistributedSchedulingAMProtocol run() throws Exception { - setAMRMTokenService(conf); - return ServerRMProxy.createRMProxy(conf, - DistributedSchedulingAMProtocol.class); - } - }); + rmClient = createRMClient(appContext, conf); } catch (IOException e) { String message = "Error while creating of RM app master service proxy for attemptId:" @@ -100,6 +92,32 @@ public DistributedSchedulingAMProtocol run() throws Exception { } } + private ApplicationMasterProtocol createRMClient( + AMRMProxyApplicationContext appContext, final Configuration conf) + throws IOException, InterruptedException { + if (appContext.getNMCotext().isDistributedSchedulingEnabled()) { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedSchedulingAMProtocol run() throws Exception { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, + DistributedSchedulingAMProtocol.class); + } + }); + } else { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + setAMRMTokenService(conf); + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( final RegisterApplicationMasterRequest request) @@ -127,9 +145,15 @@ public AllocateResponse allocate(final AllocateRequest request) registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + - "request to the real YARN RM"); - return rmClient.registerApplicationMasterForDistributedScheduling(request); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + + "request to the real YARN RM"); + return ((DistributedSchedulingAMProtocol)rmClient) + .registerApplicationMasterForDistributedScheduling(request); + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); + } } @Override @@ -140,13 +164,18 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); } - DistributedSchedulingAllocateResponse allocateResponse = - rmClient.allocateForDistributedScheduling(request); - if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + DistributedSchedulingAllocateResponse allocateResponse = + ((DistributedSchedulingAMProtocol)rmClient) + .allocateForDistributedScheduling(request); + if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + } + return allocateResponse; + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); } - - return allocateResponse; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index bfb12ee..e920892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; - - - import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; /** *

The DistributedScheduler runs on the NodeManager and is modeled as an @@ -76,74 +65,46 @@ */ public final class DistributedScheduler extends AbstractRequestInterceptor { - static class PartitionedResourceRequests { - private List guaranteed = new ArrayList<>(); - private List opportunistic = new ArrayList<>(); - public List getGuaranteed() { - return guaranteed; - } - public List getOpportunistic() { - return opportunistic; - } - } - - static class DistributedSchedulerParams { - Resource maxResource; - Resource minResource; - Resource incrementResource; - int containerTokenExpiryInterval; - } - private static final Logger LOG = LoggerFactory .getLogger(DistributedScheduler.class); private final static RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); - - private DistributedSchedulerParams appParams = - new DistributedSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter - containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map nodeList = new LinkedHashMap<>(); - - // Mapping of NodeId to NodeTokens. Populated either from RM response or - // generated locally if required. - private Map nodeTokens = new HashMap<>(); - final Set blacklist = new HashSet<>(); - - // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, - // Resource Name (Host/rack/any) and capability. This mapping is required - // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequest (ask). - final TreeMap> - outstandingOpReqs = new TreeMap<>(); + private OpportunisticContainerContext appContext = + new OpportunisticContainerContext(); private ApplicationAttemptId applicationAttemptId; private OpportunisticContainerAllocator containerAllocator; private NMTokenSecretManagerInNM nmSecretManager; private String appSubmitter; + private long rmIdentifier; public void init(AMRMProxyApplicationContext appContext) { super.init(appContext); - initLocal(appContext.getApplicationAttemptId(), + initLocal(appContext.getNMCotext().getNodeStatusUpdater().getRMIdentifier(), + appContext.getApplicationAttemptId(), appContext.getNMCotext().getContainerAllocator(), appContext.getNMCotext().getNMTokenSecretManager(), appContext.getUser()); } @VisibleForTesting - void initLocal(ApplicationAttemptId applicationAttemptId, + void initLocal(long rmIdentifier, ApplicationAttemptId applicationAttemptId, OpportunisticContainerAllocator containerAllocator, NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { + this.rmIdentifier = rmIdentifier; this.applicationAttemptId = applicationAttemptId; this.containerAllocator = containerAllocator; this.nmSecretManager = nmSecretManager; this.appSubmitter = appSubmitter; + this.appContext.setContainerIdGenerator( + new OpportunisticContainerAllocator.ContainerIdGenerator() { + @Override + public long generateContainerId() { + return this.containerIdCounter.decrementAndGet(); + } + }); } /** @@ -202,7 +163,7 @@ private void updateResponseWithNMTokens(AllocateResponse response, if (allocatedContainers.size() > 0) { response.getAllocatedContainers().addAll(allocatedContainers); for (Container alloc : allocatedContainers) { - if (!nodeTokens.containsKey(alloc.getNodeId())) { + if (!appContext.getNodeTokens().containsKey(alloc.getNodeId())) { newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); } } @@ -212,115 +173,34 @@ private void updateResponseWithNMTokens(AllocateResponse response, } } - private PartitionedResourceRequests partitionAskList(List - askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - private void updateParameters( RegisterDistributedSchedulingAMResponse registerResponse) { - appParams.minResource = registerResponse.getMinContainerResource(); - appParams.maxResource = registerResponse.getMaxContainerResource(); - appParams.incrementResource = - registerResponse.getIncrContainerResource(); - if (appParams.incrementResource == null) { - appParams.incrementResource = appParams.minResource; + appContext.getAppParams().setMinResource( + registerResponse.getMinContainerResource()); + appContext.getAppParams().setMaxResource( + registerResponse.getMaxContainerResource()); + appContext.getAppParams().setIncrementResource( + registerResponse.getIncrContainerResource()); + if (appContext.getAppParams().getIncrementResource() == null) { + appContext.getAppParams().setIncrementResource( + appContext.getAppParams().getMinResource()); } - appParams.containerTokenExpiryInterval = registerResponse - .getContainerTokenExpiryInterval(); + appContext.getAppParams().setContainerTokenExpiryInterval( + registerResponse.getContainerTokenExpiryInterval()); - containerIdCounter + appContext.getContainerIdGenerator() .resetContainerIdCounter(registerResponse.getContainerIdStart()); setNodeList(registerResponse.getNodesForScheduling()); } - /** - * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds to the outstanding - * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce - * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability. - * - * @param resourceAsks the list with the {@link ResourceRequest}s - */ - public void addToOutstandingReqs(List resourceAsks) { - for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); - - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map reqMap = - this.outstandingOpReqs.get(priority); - if (reqMap == null) { - reqMap = new HashMap<>(); - this.outstandingOpReqs.put(priority, reqMap); - } - - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); - } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); - } - if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority - + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); - } - } - } - - /** - * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest. - */ - private void matchAllocationToOutstandingRequest(Resource capability, - List allocatedContainers) { - for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); - Map asks = - outstandingOpReqs.get(c.getPriority()); - - if (asks == null) - continue; - - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { - asks.remove(capability); - } - } - } - } - private void setNodeList(List nodeList) { - this.nodeList.clear(); + appContext.getNodeMap().clear(); addToNodeList(nodeList); } private void addToNodeList(List nodes) { for (NodeId n : nodes) { - this.nodeList.put(n.getHost(), n); + appContext.getNodeMap().put(n.getHost(), n); } } @@ -345,52 +225,13 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( LOG.debug("Forwarding allocate request to the" + "Distributed Scheduler Service on YARN RM"); } - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAllocateRequest().getAskList()); - - List releasedContainers = - request.getAllocateRequest().getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - containersAllocated.removeAll(releasedContainers); - } - - // Also, update black list - ResourceBlacklistRequest rbr = - request.getAllocateRequest().getResourceBlacklistRequest(); - if (rbr != null) { - blacklist.removeAll(rbr.getBlacklistRemovals()); - blacklist.addAll(rbr.getBlacklistAdditions()); - } - - // Add OPPORTUNISTIC reqs to the outstanding reqs - addToOutstandingReqs(partitionedAsks.getOpportunistic()); - - List allocatedContainers = new ArrayList<>(); - for (Priority priority : outstandingOpReqs.descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given Cap (The actual container size - // might be different than what is requested.. which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map> allocated = - containerAllocator.allocate(this.appParams, containerIdCounter, - outstandingOpReqs.get(priority).values(), blacklist, - applicationAttemptId, nodeList, appSubmitter); - for (Map.Entry> e : allocated.entrySet()) { - matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); - } - } + List allocatedContainers = + containerAllocator.allocateContainers( + request.getAllocateRequest(), applicationAttemptId, + appContext, rmIdentifier, appSubmitter); request.setAllocatedContainers(allocatedContainers); - // Send all the GUARANTEED Reqs to RM - request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); @@ -398,7 +239,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( setNodeList(dsResp.getNodesForScheduling()); List nmTokens = dsResp.getAllocateResponse().getNMTokens(); for (NMToken nmToken : nmTokens) { - nodeTokens.put(nmToken.getNodeId(), nmToken); + appContext.getNodeTokens().put(nmToken.getNodeId(), nmToken); } List completedContainers = @@ -407,7 +248,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( // Only account for opportunistic containers for (ContainerStatus cs : completedContainers) { if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); + appContext.getContainersAllocated().remove(cs.getContainerId()); } } @@ -419,7 +260,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( if (LOG.isDebugEnabled()) { LOG.debug( "Number of opportunistic containers currently allocated by" + - "application: " + containersAllocated.size()); + "application: " + appContext.getContainersAllocated().size()); } return dsResp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java deleted file mode 100644 index 4723233..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ContainerType; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.net.InetSocketAddress; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; - -/** - *

- * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. - * It also uses the NMTokenSecretManagerInNM to generate the - * required NM tokens for the allocated containers. - *

- */ -public class OpportunisticContainerAllocator { - - private static final Log LOG = - LogFactory.getLog(OpportunisticContainerAllocator.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DominantResourceCalculator(); - - static class ContainerIdCounter { - final AtomicLong containerIdCounter = new AtomicLong(1); - - void resetContainerIdCounter(long containerIdStart) { - this.containerIdCounter.set(containerIdStart); - } - - long generateContainerId() { - return this.containerIdCounter.decrementAndGet(); - } - } - - private final NodeStatusUpdater nodeStatusUpdater; - private final Context context; - private int webpagePort; - - public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, - Context context, int webpagePort) { - this.nodeStatusUpdater = nodeStatusUpdater; - this.context = context; - this.webpagePort = webpagePort; - } - - public Map> allocate( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Collection resourceAsks, Set blacklist, - ApplicationAttemptId appAttId, Map allNodes, - String userName) throws YarnException { - Map> containers = new HashMap<>(); - for (ResourceRequest anyAsk : resourceAsks) { - allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, - allNodes, userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); - } - return containers; - } - - private void allocateOpportunisticContainers( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> containers, ResourceRequest anyAsk) - throws YarnException { - int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; - } - nodesForScheduling.add(nodeEntry.getValue()); - } - int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(appParams, idCounter, anyAsk, id, - userName, nodeId); - List cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); - } - cList.add(container); - numAllocated++; - LOG.info("Allocated [" + container.getId() + "] as opportunistic."); - } - LOG.info("Allocated " + numAllocated + " opportunistic containers."); - } - - private Container buildContainer(DistributedSchedulerParams appParams, - ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, - String userName, NodeId nodeId) throws YarnException { - ContainerId cId = - ContainerId.newContainerId(id, idCounter.generateContainerId()); - - // Normalize the resource asks (Similar to what the the RM scheduler does - // before accepting an ask) - Resource capability = normalizeCapability(appParams, rr); - - long currTime = System.currentTimeMillis(); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier( - cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, - context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), - nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, - ExecutionType.OPPORTUNISTIC); - byte[] pwd = - context.getContainerTokenSecretManager().createPassword( - containerTokenIdentifier); - Token containerToken = newContainerToken(nodeId, pwd, - containerTokenIdentifier); - Container container = BuilderUtils.newContainer( - cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken, - containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); - return container; - } - - private Resource normalizeCapability(DistributedSchedulerParams appParams, - ResourceRequest ask) { - return Resources.normalize(RESOURCE_CALCULATOR, - ask.getCapability(), appParams.minResource, appParams.maxResource, - appParams.incrementResource); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), - nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index f716d44..d8660dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index b093b3b..8f1ae7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -189,7 +190,6 @@ private RequestInterceptor setup(Configuration conf, DistributedScheduler distributedScheduler) { NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); NMContainerTokenSecretManager nmContainerTokenSecretManager = new NMContainerTokenSecretManager(conf); MasterKey mKey = new MasterKey() { @@ -207,15 +207,13 @@ public ByteBuffer getBytes() { public void setBytes(ByteBuffer bytes) {} }; nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); nmTokenSecretManagerInNM.setMasterKey(mKey); - distributedScheduler.initLocal( + distributedScheduler.initLocal(1234, ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), containerAllocator, nmTokenSecretManagerInNM, "test"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java deleted file mode 100644 index 843ac09..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; -import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; - - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; - -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; - - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The DistributedSchedulingAMService is started instead of the - * ApplicationMasterService if distributed scheduling is enabled for the YARN - * cluster. - * It extends the functionality of the ApplicationMasterService by servicing - * clients (AMs and AMRMProxy request interceptors) that understand the - * DistributedSchedulingProtocol. - */ -public class DistributedSchedulingAMService extends ApplicationMasterService - implements DistributedSchedulingAMProtocol, EventHandler { - - private static final Log LOG = - LogFactory.getLog(DistributedSchedulingAMService.class); - - private final NodeQueueLoadMonitor nodeMonitor; - - private final ConcurrentHashMap> rackToNode = - new ConcurrentHashMap<>(); - private final ConcurrentHashMap> hostToNode = - new ConcurrentHashMap<>(); - private final int k; - - public DistributedSchedulingAMService(RMContext rmContext, - YarnScheduler scheduler) { - super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler); - this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); - long nodeSortInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); - NodeQueueLoadMonitor.LoadComparator comparator = - NodeQueueLoadMonitor.LoadComparator.valueOf( - rmContext.getYarnConfiguration().get( - YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, - YarnConfiguration. - NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); - - NodeQueueLoadMonitor topKSelector = - new NodeQueueLoadMonitor(nodeSortInterval, comparator); - - float sigma = rmContext.getYarnConfiguration() - .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, - YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); - - int limitMin, limitMax; - - if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { - limitMin = rmContext.getYarnConfiguration() - .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, - YarnConfiguration. - NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); - limitMax = rmContext.getYarnConfiguration() - .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, - YarnConfiguration. - NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); - } else { - limitMin = rmContext.getYarnConfiguration() - .getInt( - YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); - limitMax = rmContext.getYarnConfiguration() - .getInt( - YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); - } - - topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); - this.nodeMonitor = topKSelector; - } - - @Override - public Server getServer(YarnRPC rpc, Configuration serverConf, - InetSocketAddress addr, AMRMTokenSecretManager secretManager) { - Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, - addr, serverConf, secretManager, - serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running on NMs that DO NOT support - // Dist Scheduling... The server multiplexes both the - // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol - ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - ApplicationMasterProtocolPB.class, - ApplicationMasterProtocolService.newReflectiveBlockingService( - new ApplicationMasterProtocolPBServiceImpl(this))); - return server; - } - - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { - return super.registerApplicationMaster(request); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - return super.finishApplicationMaster(request); - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - return super.allocate(request); - } - - @Override - public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { - RegisterApplicationMasterResponse response = - registerApplicationMaster(request); - RegisterDistributedSchedulingAMResponse dsResp = recordFactory - .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); - dsResp.setRegisterResponse(response); - dsResp.setMinContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, - YarnConfiguration. - DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setMaxContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setIncrContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, - YarnConfiguration. - DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setContainerTokenExpiryInterval( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, - YarnConfiguration. - DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); - dsResp.setContainerIdStart( - this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); - - // Set nodes to be used for scheduling - dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); - return dsResp; - } - - @Override - public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( - DistributedSchedulingAllocateRequest request) - throws YarnException, IOException { - List distAllocContainers = request.getAllocatedContainers(); - for (Container container : distAllocContainers) { - // Create RMContainer - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler) rmContext.getScheduler()) - .getCurrentAttemptForContainer(container.getId()); - RMContainer rmContainer = new RMContainerImpl(container, - appAttempt.getApplicationAttemptId(), container.getNodeId(), - appAttempt.getUser(), rmContext, true); - appAttempt.addRMContainer(container.getId(), rmContainer); - rmContainer.handle( - new RMContainerEvent(container.getId(), - RMContainerEventType.LAUNCHED)); - } - AllocateResponse response = allocate(request.getAllocateRequest()); - DistributedSchedulingAllocateResponse dsResp = recordFactory - .newRecordInstance(DistributedSchedulingAllocateResponse.class); - dsResp.setAllocateResponse(response); - dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); - return dsResp; - } - - private void addToMapping(ConcurrentHashMap> mapping, - String rackName, NodeId nodeId) { - if (rackName != null) { - mapping.putIfAbsent(rackName, new HashSet()); - Set nodeIds = mapping.get(rackName); - synchronized (nodeIds) { - nodeIds.add(nodeId); - } - } - } - - private void removeFromMapping(ConcurrentHashMap> mapping, - String rackName, NodeId nodeId) { - if (rackName != null) { - Set nodeIds = mapping.get(rackName); - synchronized (nodeIds) { - nodeIds.remove(nodeId); - } - } - } - - @Override - public void handle(SchedulerEvent event) { - switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; - nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); - addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = - (NodeRemovedSchedulerEvent) event; - nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); - removeFromMapping(rackToNode, - nodeRemovedEvent.getRemovedRMNode().getRackName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - removeFromMapping(hostToNode, - nodeRemovedEvent.getRemovedRMNode().getHostName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) - event; - nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); - break; - case NODE_RESOURCE_UPDATE: - if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent) event; - nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), - nodeResourceUpdatedEvent.getResourceOption()); - break; - - // <-- IGNORED EVENTS : START --> - case APP_ADDED: - break; - case APP_REMOVED: - break; - case APP_ATTEMPT_ADDED: - break; - case APP_ATTEMPT_REMOVED: - break; - case CONTAINER_EXPIRED: - break; - case NODE_LABELS_UPDATE: - break; - // <-- IGNORED EVENTS : END --> - default: - LOG.error("Unknown event arrived at DistributedSchedulingAMService: " - + event.toString()); - } - - } - - public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return nodeMonitor.getThresholdCalculator(); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java new file mode 100644 index 0000000..9101130 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; +import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; + + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The OpportunisticContainerAllocatingAMService is started instead of the + * ApplicationMasterService if distributed scheduling is enabled for the YARN + * cluster. + * It extends the functionality of the ApplicationMasterService by servicing + * clients (AMs and AMRMProxy request interceptors) that understand the + * DistributedSchedulingProtocol. + */ +public class OpportunisticContainerAllocatingAMService + extends ApplicationMasterService implements DistributedSchedulingAMProtocol, + EventHandler { + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocatingAMService.class); + + private final NodeQueueLoadMonitor nodeMonitor; + + private final ConcurrentHashMap> rackToNode = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap> hostToNode = + new ConcurrentHashMap<>(); + private final int k; + + public OpportunisticContainerAllocatingAMService(RMContext rmContext, + YarnScheduler scheduler) { + super(OpportunisticContainerAllocatingAMService.class.getName(), + rmContext, scheduler); + this.k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPPORTUNISTIC_SCHEDULING_NODES_NUMBER_USED, + YarnConfiguration.OPPORTUNISTIC_SCHEDULING_NODES_NUMBER_USED_DEFAULT); + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeQueueLoadMonitor topKSelector = + new NodeQueueLoadMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; + } + + @Override + public Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) { + Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running on NMs that DO NOT support + // Dist Scheduling... The server multiplexes both the + // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + return super.getServer(rpc, serverConf, addr, secretManager); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return super.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return super.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return super.allocate(request); + } + + @Override + public RegisterDistributedSchedulingAMResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse response = + registerApplicationMaster(request); + RegisterDistributedSchedulingAMResponse dsResp = recordFactory + .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); + dsResp.setRegisterResponse(response); + dsResp.setMinContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT) + ) + ); + dsResp.setMaxContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB, + YarnConfiguration + .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT) + ) + ); + dsResp.setIncrContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT) + ) + ); + dsResp.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT)); + dsResp.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + + // Set nodes to be used for scheduling + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + @Override + public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( + DistributedSchedulingAllocateRequest request) + throws YarnException, IOException { + List distAllocContainers = request.getAllocatedContainers(); + for (Container container : distAllocContainers) { + // Create RMContainer + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, true); + appAttempt.addRMContainer(container.getId(), rmContainer); + rmContainer.handle( + new RMContainerEvent(container.getId(), + RMContainerEventType.LAUNCHED)); + } + AllocateResponse response = allocate(request.getAllocateRequest()); + DistributedSchedulingAllocateResponse dsResp = recordFactory + .newRecordInstance(DistributedSchedulingAllocateResponse.class); + dsResp.setAllocateResponse(response); + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + private void addToMapping(ConcurrentHashMap> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + mapping.putIfAbsent(rackName, new HashSet()); + Set nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.add(nodeId); + } + } + } + + private void removeFromMapping(ConcurrentHashMap> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + Set nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.remove(nodeId); + } + } + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at" + + "OpportunisticContainerAllocatingAMService: " + event.toString()); + } + + } + + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4509045..5a726de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; @@ -1177,24 +1176,27 @@ protected ClientRMService createClientRMService() { } protected ApplicationMasterService createApplicationMasterService() { - if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, - YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - DistributedSchedulingAMService distributedSchedulingService = new - DistributedSchedulingAMService(this.rmContext, scheduler); - EventDispatcher distSchedulerEventDispatcher = - new EventDispatcher(distributedSchedulingService, - DistributedSchedulingAMService.class.getName()); - // Add an event dispatcher for the DistributedSchedulingAMService - // to handle node updates/additions and removals. + Configuration conf = this.rmContext.getYarnConfiguration(); + if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(conf) + || YarnConfiguration.isDistSchedulingEnabled(conf)) { + OpportunisticContainerAllocatingAMService + oppContainerAllocatingAMService = + new OpportunisticContainerAllocatingAMService(this.rmContext, + scheduler); + EventDispatcher oppContainerAllocEventDispatcher = + new EventDispatcher(oppContainerAllocatingAMService, + OpportunisticContainerAllocatingAMService.class.getName()); + // Add an event dispatcher for the + // OpportunisticContainerAllocatingAMService to handle node + // updates/additions and removals. // Since the SchedulerEvent is currently a super set of theses, // we register interest for it.. - addService(distSchedulerEventDispatcher); + addService(oppContainerAllocEventDispatcher); rmDispatcher.register(SchedulerEventType.class, - distSchedulerEventDispatcher); + oppContainerAllocEventDispatcher); this.rmContext.setContainerQueueLimitCalculator( - distributedSchedulingService.getNodeManagerQueueLimitCalculator()); - return distributedSchedulingService; + oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator()); + return oppContainerAllocatingAMService; } return new ApplicationMasterService(this.rmContext, scheduler); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7d1b3c3..7eb6469 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -820,9 +820,9 @@ protected void serviceStop() { @Override protected ApplicationMasterService createApplicationMasterService() { if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - return new DistributedSchedulingAMService(getRMContext(), scheduler) { + return new OpportunisticContainerAllocatingAMService(getRMContext(), scheduler) { @Override protected void serviceStart() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java index 0213a94..4c9a1c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java @@ -71,11 +71,11 @@ import java.util.List; /** - * Test cases for {@link DistributedSchedulingAMService}. + * Test cases for {@link OpportunisticContainerAllocatingAMService}. */ public class TestDistributedSchedulingAMService { - // Test if the DistributedSchedulingAMService can handle both DSProtocol as + // Test if the OpportunisticContainerAllocatingAMService can handle both DSProtocol as // well as AMProtocol clients @Test public void testRPCWrapping() throws Exception { @@ -111,8 +111,9 @@ public Configuration getYarnConfiguration() { Resource.newInstance(1, 2), 1, true, "exp", ExecutionTypeRequest.newInstance( ExecutionType.OPPORTUNISTIC, true)))); - DistributedSchedulingAMService service = + OpportunisticContainerAllocatingAMService service = createService(factory, rmContext, c); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -195,9 +196,9 @@ public Configuration getYarnConfiguration() { false, dsfinishResp.getIsUnregistered()); } - private DistributedSchedulingAMService createService(final RecordFactory + private OpportunisticContainerAllocatingAMService createService(final RecordFactory factory, final RMContext rmContext, final Container c) { - return new DistributedSchedulingAMService(rmContext, null) { + return new OpportunisticContainerAllocatingAMService(rmContext, null) { @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws