diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java index dfe85f2..b1fd08e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -91,7 +91,8 @@ public void doTest(int numMappers, int numReducers, int numNodes, Configuration conf = new Configuration(); // Start the mini-MR and mini-DFS clusters conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); dfsCluster = new MiniDFSCluster.Builder(conf) .numDataNodes(numNodes).build(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6c921cd..d1f410b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -306,55 +306,60 @@ public static boolean isAclEnabled(Configuration conf) { YARN_PREFIX + "distributed-scheduling.enabled"; public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; - /** Minimum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.min-container-memory-mb"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT = 512; - - /** Minimum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MIN_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.min-container-vcores"; - public static final int DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT = 1; - - /** Maximum memory (in MB) used for allocating a container through distributed - * scheduling. */ - public static final String DIST_SCHEDULING_MAX_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.max-container-memory-mb"; - public static final int DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT = 2048; - - /** Maximum virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_MAX_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.max-container-vcores"; - public static final int DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT = 4; - - /** Incremental memory (in MB) used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB = - YARN_PREFIX + "distributed-scheduling.incr-container-memory-mb"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT = + /** Setting that controls whether opportunistic container allocation + * is enabled or not. */ + public static final String OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = + YARN_PREFIX + "opportunistic-container-allocation.enabled"; + public static final boolean + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false; + + /** Minimum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.min-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512; + + /** Minimum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES = + YARN_PREFIX + "opportunistic-containers.min-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1; + + /** Maximum memory (in MB) used for allocating an opportunistic container. */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.max-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048; + + /** Maximum virtual CPU cores used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES = + YARN_PREFIX + "opportunistic-containers.max-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4; + + /** Incremental memory (in MB) used for allocating an opportunistic container. + * */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB = + YARN_PREFIX + "opportunistic-containers.incr-memory-mb"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT = 512; - /** Incremental virtual CPU cores used for allocating a container through - * distributed scheduling. */ - public static final String DIST_SCHEDULING_INCR_CONTAINER_VCORES = - YARN_PREFIX + "distributed-scheduling.incr-vcores"; - public static final int DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT = 1; - - /** Container token expiry for container allocated via distributed - * scheduling. */ - public static final String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = - YARN_PREFIX + "distributed-scheduling.container-token-expiry-ms"; - public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + /** Incremental virtual CPU cores used for allocating an opportunistic + * container. */ + public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES = + YARN_PREFIX + "opportunistic-containers.incr-vcores"; + public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1; + + /** Container token expiry for opportunistic containers. */ + public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS = + YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms"; + public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT = 600000; - /** Number of nodes to be used by the LocalScheduler of a NodeManager for - * dispatching containers during distributed scheduling. */ - public static final String DIST_SCHEDULING_NODES_NUMBER_USED = - YARN_PREFIX + "distributed-scheduling.nodes-used"; - public static final int DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT = 10; + /** Number of nodes to be used by the Opportunistic Container allocator for + * dispatching containers during container allocation. */ + public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED = + YARN_PREFIX + "opportunistic-container-allocation.nodes-used"; + public static final int OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT = + 10; /** Frequency for computing least loaded NMs. */ public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS = @@ -2829,6 +2834,18 @@ public static String getClusterId(Configuration conf) { return clusterId; } + public static boolean isDistSchedulingEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + } + + public static boolean isOpportunisticContainerAllocationEnabled( + Configuration conf) { + return conf.getBoolean( + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT); + } + // helper methods for timeline service configuration /** * Returns whether the timeline service is enabled via configuration. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index 71321e3..57249a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -84,7 +84,7 @@ * specifying OPPORTUNISTIC containers in its resource requests, * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the DistributedSchedulingAMService running on the RM. + * to the OpportunisticContainerAllocatingAMService running on the RM. */ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { @@ -105,7 +105,8 @@ public void doBefore() throws Exception { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); cluster.init(conf); cluster.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3ebdc99..7305283 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2761,72 +2761,76 @@ - Minimum memory (in MB) used for allocating a container through distributed - scheduling. + Setting that controls whether opportunistic container allocation + is enabled. - yarn.distributed-scheduling.min-container-memory-mb + yarn.opportunistic-container-allocation.enabled + false + + + + + Minimum memory (in MB) used for allocating an opportunistic container. + + yarn.opportunistic-containers.min-memory-mb 512 - Minimum virtual CPU cores used for allocating a container through - distributed scheduling. + Minimum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.min-container-vcores + yarn.opportunistic-containers.min-vcores 1 - Maximum memory (in MB) used for allocating a container through distributed - scheduling. + Maximum memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-memory-mb + yarn.opportunistic-containers.max-memory-mb 2048 - Maximum virtual CPU cores used for allocating a container through - distributed scheduling. + Maximum virtual CPU cores used for allocating an opportunistic container. - yarn.distributed-scheduling.max-container-vcores + yarn.opportunistic-containers.max-vcores 4 - Incremental memory (in MB) used for allocating a container through - distributed scheduling. + Incremental memory (in MB) used for allocating an opportunistic container. - yarn.distributed-scheduling.incr-container-memory-mb + yarn.opportunistic-containers.incr-memory-mb 512 - Incremental virtual CPU cores used for allocating a container through - distributed scheduling. + Incremental virtual CPU cores used for allocating an opportunistic + container. - yarn.distributed-scheduling.incr-vcores + yarn.opportunistic-containers.incr-vcores 1 - Container token expiry for container allocated via distributed scheduling. + Container token expiry for opportunistic containers. - yarn.distributed-scheduling.container-token-expiry-ms + yarn.opportunistic-containers.container-token-expiry-ms 600000 - Number of nodes to be used by the LocalScheduler of a NodeManager for - dispatching containers during distributed scheduling. + Number of nodes to be used by the Opportunistic Container Allocator for + dispatching containers during container allocation. - yarn.distributed-scheduling.nodes-used + yarn.opportunistic-container-allocation.nodes-used 10 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java new file mode 100644 index 0000000..708dca2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; + +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + *

+ * The OpportunisticContainerAllocator allocates containers on a given list of + * nodes, after modifying the container sizes to respect the limits set by the + * ResourceManager. It tries to distribute the containers as evenly as possible. + *

+ */ +public class OpportunisticContainerAllocator { + + /** + * This class encapsulates application specific parameters used to build a + * Container. + */ + public static class AllocationParams { + private Resource maxResource; + private Resource minResource; + private Resource incrementResource; + private int containerTokenExpiryInterval; + + /** + * Return Max Resource. + * @return Resource + */ + public Resource getMaxResource() { + return maxResource; + } + + /** + * Set Max Resource. + * @param maxResource Resource + */ + public void setMaxResource(Resource maxResource) { + this.maxResource = maxResource; + } + + /** + * Get Min Resource. + * @return Resource + */ + public Resource getMinResource() { + return minResource; + } + + /** + * Set Min Resource. + * @param minResource Resource + */ + public void setMinResource(Resource minResource) { + this.minResource = minResource; + } + + /** + * Get Incrmental Resource. + * @return Incremental Resource + */ + public Resource getIncrementResource() { + return incrementResource; + } + + /** + * Set Incremental resource. + * @param incrementResource Resource + */ + public void setIncrementResource(Resource incrementResource) { + this.incrementResource = incrementResource; + } + + /** + * Get Container Token Expiry interval. + * @return Container Token Expiry interval + */ + public int getContainerTokenExpiryInterval() { + return containerTokenExpiryInterval; + } + + /** + * Set Container Token Expiry time in ms. + * @param containerTokenExpiryInterval Container Token Expiry in ms + */ + public void setContainerTokenExpiryInterval( + int containerTokenExpiryInterval) { + this.containerTokenExpiryInterval = containerTokenExpiryInterval; + } + } + + /** + * A Container Id Generator. + */ + public static class ContainerIdGenerator { + + protected volatile AtomicLong containerIdCounter = new AtomicLong(1); + + /** + * This method can reset the generator to a specific value. + * @param containerIdStart containerId + */ + public void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart); + } + + /** + * Sets the underlying Atomic Long. To be used when implementation needs to + * share the underlying AtomicLong of an existing counter. + * @param counter AtomicLong + */ + public void setContainerIdCounter(AtomicLong counter) { + this.containerIdCounter = counter; + } + + /** + * Generates a new long value. Default implementation increments the + * underlying AtomicLong. Sub classes are encouraged to over-ride this + * behaviour. + * @return Counter. + */ + public long generateContainerId() { + return this.containerIdCounter.incrementAndGet(); + } + } + + static class PartitionedResourceRequests { + private List guaranteed = new ArrayList<>(); + private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { + return guaranteed; + } + public List getOpportunistic() { + return opportunistic; + } + } + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocator.class); + + private static final ResourceCalculator RESOURCE_CALCULATOR = + new DominantResourceCalculator(); + + private final BaseContainerTokenSecretManager tokenSecretManager; + private int webpagePort; + + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param webpagePort Webpage Port + */ + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, int webpagePort) { + this.tokenSecretManager = tokenSecretManager; + this.webpagePort = webpagePort; + } + + /** + * Entry point into the Opportunistic Container Allocator. + * @param request AllocateRequest + * @param applicationAttemptId ApplicationAttemptId + * @param appContext App Specific OpportunisticContainerContext + * @param rmIdentifier RM Identifier + * @param appSubmitter App Submitter + * @return List of Containers. + * @throws YarnException YarnException + */ + public List allocateContainers( + AllocateRequest request, ApplicationAttemptId applicationAttemptId, + OpportunisticContainerContext appContext, long rmIdentifier, + String appSubmitter) throws YarnException { + // Partition requests into GUARANTEED and OPPORTUNISTIC reqs + PartitionedResourceRequests partitionedAsks = + partitionAskList(request.getAskList()); + + List releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + applicationAttemptId + " released: " + + numReleasedContainers); + appContext.getContainersAllocated().removeAll(releasedContainers); + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + if (rbr != null) { + appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + } + + // Add OPPORTUNISTIC reqs to the outstanding reqs + appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + + List allocatedContainers = new ArrayList<>(); + for (Priority priority : + appContext.getOutstandingOpReqs().descendingKeySet()) { + // Allocated containers : + // Key = Requested Capability, + // Value = List of Containers of given Cap (The actual container size + // might be different than what is requested.. which is why + // we need the requested capability (key) to match against + // the outstanding reqs) + Map> allocated = allocate(rmIdentifier, + appContext, priority, applicationAttemptId, appSubmitter); + for (Map.Entry> e : allocated.entrySet()) { + appContext.matchAllocationToOutstandingRequest( + e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + + // Send all the GUARANTEED Reqs to RM + request.setAskList(partitionedAsks.getGuaranteed()); + return allocatedContainers; + } + + private Map> allocate(long rmIdentifier, + OpportunisticContainerContext appContext, Priority priority, + ApplicationAttemptId appAttId, String userName) throws YarnException { + Map> containers = new HashMap<>(); + for (ResourceRequest anyAsk : + appContext.getOutstandingOpReqs().get(priority).values()) { + allocateContainersInternal(rmIdentifier, appContext.getAppParams(), + appContext.getContainerIdGenerator(), appContext.getBlacklist(), + appAttId, appContext.getNodeMap(), userName, containers, anyAsk); + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } + return containers; + } + + private void allocateContainersInternal(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + Set blacklist, ApplicationAttemptId id, + Map allNodes, String userName, + Map> containers, ResourceRequest anyAsk) + throws YarnException { + int toAllocate = anyAsk.getNumContainers() + - (containers.isEmpty() ? 0 : + containers.get(anyAsk.getCapability()).size()); + + List nodesForScheduling = new ArrayList<>(); + for (Entry nodeEntry : allNodes.entrySet()) { + // Do not use blacklisted nodes for scheduling. + if (blacklist.contains(nodeEntry.getKey())) { + continue; + } + nodesForScheduling.add(nodeEntry.getValue()); + } + int numAllocated = 0; + int nextNodeToSchedule = 0; + for (int numCont = 0; numCont < toAllocate; numCont++) { + nextNodeToSchedule++; + nextNodeToSchedule %= nodesForScheduling.size(); + NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); + Container container = buildContainer(rmIdentifier, appParams, idCounter, + anyAsk, id, userName, nodeId); + List cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("Allocated [" + container.getId() + "] as opportunistic."); + } + LOG.info("Allocated " + numAllocated + " opportunistic containers."); + } + + private Container buildContainer(long rmIdentifier, + AllocationParams appParams, ContainerIdGenerator idCounter, + ResourceRequest rr, ApplicationAttemptId id, String userName, + NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + + // Normalize the resource asks (Similar to what the the RM scheduler does + // before accepting an ask) + Resource capability = normalizeCapability(appParams, rr); + + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, + capability, currTime + appParams.containerTokenExpiryInterval, + tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, + rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.OPPORTUNISTIC); + byte[] pwd = + tokenSecretManager.createPassword(containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType(), + rr.getAllocationRequestId()); + return container; + } + + private Resource normalizeCapability(AllocationParams appParams, + ResourceRequest ask) { + return Resources.normalize(RESOURCE_CALCULATOR, + ask.getCapability(), appParams.minResource, appParams.maxResource, + appParams.incrementResource); + } + + private static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + private PartitionedResourceRequests partitionAskList(List + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + partitionedRequests.getOpportunistic().add(rr); + } else { + partitionedRequests.getGuaranteed().add(rr); + } + } + return partitionedRequests; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java new file mode 100644 index 0000000..1b701ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; +import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; + +/** + * This encapsulates application specific information used by the + * Opportunistic Container Allocator to allocate containers. + */ +public class OpportunisticContainerContext { + + private static final Logger LOG = LoggerFactory + .getLogger(OpportunisticContainerContext.class); + + // Currently just used to keep track of allocated containers. + // Can be used for reporting stats later. + private Set containersAllocated = new HashSet<>(); + private AllocationParams appParams = + new AllocationParams(); + private ContainerIdGenerator containerIdGenerator = + new ContainerIdGenerator(); + + private Map nodeMap = new LinkedHashMap<>(); + + // Mapping of NodeId to NodeTokens. Populated either from RM response or + // generated locally if required. + private Map nodeTokens = new HashMap<>(); + private final Set blacklist = new HashSet<>(); + + // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, + // Resource Name (Host/rack/any) and capability. This mapping is required + // to match a received Container to an outstanding OPPORTUNISTIC + // ResourceRequest (ask). + private final TreeMap> + outstandingOpReqs = new TreeMap<>(); + + public Set getContainersAllocated() { + return containersAllocated; + } + + public OpportunisticContainerAllocator.AllocationParams getAppParams() { + return appParams; + } + + public ContainerIdGenerator getContainerIdGenerator() { + return containerIdGenerator; + } + + public void setContainerIdGenerator( + ContainerIdGenerator containerIdGenerator) { + this.containerIdGenerator = containerIdGenerator; + } + + public Map getNodeMap() { + return nodeMap; + } + + public Map getNodeTokens() { + return nodeTokens; + } + + public Set getBlacklist() { + return blacklist; + } + + public TreeMap> + getOutstandingOpReqs() { + return outstandingOpReqs; + } + + /** + * Takes a list of ResourceRequests (asks), extracts the key information viz. + * (Priority, ResourceName, Capability) and adds to the outstanding + * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce + * the current YARN constraint that only a single ResourceRequest can exist at + * a give Priority and Capability. + * + * @param resourceAsks the list with the {@link ResourceRequest}s + */ + public void addToOutstandingReqs(List resourceAsks) { + for (ResourceRequest request : resourceAsks) { + Priority priority = request.getPriority(); + + // TODO: Extend for Node/Rack locality. We only handle ANY requests now + if (!ResourceRequest.isAnyLocation(request.getResourceName())) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map reqMap = + outstandingOpReqs.get(priority); + if (reqMap == null) { + reqMap = new HashMap<>(); + outstandingOpReqs.put(priority, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + + ", with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + /** + * This method matches a returned list of Container Allocations to any + * outstanding OPPORTUNISTIC ResourceRequest. + * @param capability Capability + * @param allocatedContainers Allocated Containers + */ + public void matchAllocationToOutstandingRequest(Resource capability, + List allocatedContainers) { + for (Container c : allocatedContainers) { + containersAllocated.add(c.getId()); + Map asks = + outstandingOpReqs.get(c.getPriority()); + + if (asks == null) { + continue; + } + + ResourceRequest rr = asks.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) { + asks.remove(capability); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java new file mode 100644 index 0000000..dc4aaf1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.server.scheduler; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 131eaa3..88bc29c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5bfbb8d..280a086 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -72,7 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -336,7 +336,8 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeHealthChecker); boolean isDistSchedulingEnabled = - conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + conf.getBoolean(YarnConfiguration. + OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); this.context = createNMContext(containerTokenSecretManager, @@ -370,8 +371,8 @@ protected void serviceInit(Configuration conf) throws Exception { ((NMContext) context).setWebServer(webServer); ((NMContext) context).setQueueableContainerAllocator( - new OpportunisticContainerAllocator(nodeStatusUpdater, context, - webServer.getPort())); + new OpportunisticContainerAllocator( + context.getContainerTokenSecretManager(), webServer.getPort())); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 75fe022..efbdfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -62,7 +62,7 @@ AbstractRequestInterceptor { private static final Logger LOG = LoggerFactory .getLogger(DefaultRequestInterceptor.class); - private DistributedSchedulingAMProtocol rmClient; + private ApplicationMasterProtocol rmClient; private UserGroupInformation user = null; @Override @@ -76,15 +76,7 @@ public void init(AMRMProxyApplicationContext appContext) { user.addToken(appContext.getAMRMToken()); final Configuration conf = this.getConf(); - rmClient = user.doAs( - new PrivilegedExceptionAction() { - @Override - public DistributedSchedulingAMProtocol run() throws Exception { - setAMRMTokenService(conf); - return ServerRMProxy.createRMProxy(conf, - DistributedSchedulingAMProtocol.class); - } - }); + rmClient = createRMClient(appContext, conf); } catch (IOException e) { String message = "Error while creating of RM app master service proxy for attemptId:" @@ -100,6 +92,32 @@ public DistributedSchedulingAMProtocol run() throws Exception { } } + private ApplicationMasterProtocol createRMClient( + AMRMProxyApplicationContext appContext, final Configuration conf) + throws IOException, InterruptedException { + if (appContext.getNMCotext().isDistributedSchedulingEnabled()) { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public DistributedSchedulingAMProtocol run() throws Exception { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, + DistributedSchedulingAMProtocol.class); + } + }); + } else { + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationMasterProtocol run() throws Exception { + setAMRMTokenService(conf); + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( final RegisterApplicationMasterRequest request) @@ -127,9 +145,15 @@ public AllocateResponse allocate(final AllocateRequest request) registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + - "request to the real YARN RM"); - return rmClient.registerApplicationMasterForDistributedScheduling(request); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + + "request to the real YARN RM"); + return ((DistributedSchedulingAMProtocol)rmClient) + .registerApplicationMasterForDistributedScheduling(request); + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); + } } @Override @@ -140,13 +164,18 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); } - DistributedSchedulingAllocateResponse allocateResponse = - rmClient.allocateForDistributedScheduling(request); - if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + if (getApplicationContext().getNMCotext() + .isDistributedSchedulingEnabled()) { + DistributedSchedulingAllocateResponse allocateResponse = + ((DistributedSchedulingAMProtocol)rmClient) + .allocateForDistributedScheduling(request); + if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + } + return allocateResponse; + } else { + throw new YarnException("Distributed Scheduling is not enabled !!"); } - - return allocateResponse; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index bfb12ee..368858c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -32,34 +32,23 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; - - - import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; /** *

The DistributedScheduler runs on the NodeManager and is modeled as an @@ -76,74 +65,49 @@ */ public final class DistributedScheduler extends AbstractRequestInterceptor { - static class PartitionedResourceRequests { - private List guaranteed = new ArrayList<>(); - private List opportunistic = new ArrayList<>(); - public List getGuaranteed() { - return guaranteed; - } - public List getOpportunistic() { - return opportunistic; - } - } - - static class DistributedSchedulerParams { - Resource maxResource; - Resource minResource; - Resource incrementResource; - int containerTokenExpiryInterval; - } - private static final Logger LOG = LoggerFactory .getLogger(DistributedScheduler.class); private final static RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); - - private DistributedSchedulerParams appParams = - new DistributedSchedulerParams(); - private final OpportunisticContainerAllocator.ContainerIdCounter - containerIdCounter = - new OpportunisticContainerAllocator.ContainerIdCounter(); - private Map nodeList = new LinkedHashMap<>(); - - // Mapping of NodeId to NodeTokens. Populated either from RM response or - // generated locally if required. - private Map nodeTokens = new HashMap<>(); - final Set blacklist = new HashSet<>(); - - // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority, - // Resource Name (Host/rack/any) and capability. This mapping is required - // to match a received Container to an outstanding OPPORTUNISTIC - // ResourceRequest (ask). - final TreeMap> - outstandingOpReqs = new TreeMap<>(); + private OpportunisticContainerContext oppContainerContext = + new OpportunisticContainerContext(); private ApplicationAttemptId applicationAttemptId; private OpportunisticContainerAllocator containerAllocator; private NMTokenSecretManagerInNM nmSecretManager; private String appSubmitter; - - public void init(AMRMProxyApplicationContext appContext) { - super.init(appContext); - initLocal(appContext.getApplicationAttemptId(), - appContext.getNMCotext().getContainerAllocator(), - appContext.getNMCotext().getNMTokenSecretManager(), - appContext.getUser()); + private long rmIdentifier; + + public void init(AMRMProxyApplicationContext applicationContext) { + super.init(applicationContext); + initLocal(applicationContext.getNMCotext().getNodeStatusUpdater() + .getRMIdentifier(), + applicationContext.getApplicationAttemptId(), + applicationContext.getNMCotext().getContainerAllocator(), + applicationContext.getNMCotext().getNMTokenSecretManager(), + applicationContext.getUser()); } @VisibleForTesting - void initLocal(ApplicationAttemptId applicationAttemptId, - OpportunisticContainerAllocator containerAllocator, + void initLocal(long rmId, ApplicationAttemptId appAttemptId, + OpportunisticContainerAllocator oppContainerAllocator, NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) { - this.applicationAttemptId = applicationAttemptId; - this.containerAllocator = containerAllocator; + this.rmIdentifier = rmId; + this.applicationAttemptId = appAttemptId; + this.containerAllocator = oppContainerAllocator; this.nmSecretManager = nmSecretManager; this.appSubmitter = appSubmitter; + + // Overrides the Generator to decrement container id. + this.oppContainerContext.setContainerIdGenerator( + new OpportunisticContainerAllocator.ContainerIdGenerator() { + @Override + public long generateContainerId() { + return this.containerIdCounter.decrementAndGet(); + } + }); } /** @@ -202,7 +166,8 @@ private void updateResponseWithNMTokens(AllocateResponse response, if (allocatedContainers.size() > 0) { response.getAllocatedContainers().addAll(allocatedContainers); for (Container alloc : allocatedContainers) { - if (!nodeTokens.containsKey(alloc.getNodeId())) { + if (!oppContainerContext.getNodeTokens().containsKey( + alloc.getNodeId())) { newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc)); } } @@ -212,115 +177,34 @@ private void updateResponseWithNMTokens(AllocateResponse response, } } - private PartitionedResourceRequests partitionAskList(List - askList) { - PartitionedResourceRequests partitionedRequests = - new PartitionedResourceRequests(); - for (ResourceRequest rr : askList) { - if (rr.getExecutionTypeRequest().getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - partitionedRequests.getOpportunistic().add(rr); - } else { - partitionedRequests.getGuaranteed().add(rr); - } - } - return partitionedRequests; - } - private void updateParameters( RegisterDistributedSchedulingAMResponse registerResponse) { - appParams.minResource = registerResponse.getMinContainerResource(); - appParams.maxResource = registerResponse.getMaxContainerResource(); - appParams.incrementResource = - registerResponse.getIncrContainerResource(); - if (appParams.incrementResource == null) { - appParams.incrementResource = appParams.minResource; + oppContainerContext.getAppParams().setMinResource( + registerResponse.getMinContainerResource()); + oppContainerContext.getAppParams().setMaxResource( + registerResponse.getMaxContainerResource()); + oppContainerContext.getAppParams().setIncrementResource( + registerResponse.getIncrContainerResource()); + if (oppContainerContext.getAppParams().getIncrementResource() == null) { + oppContainerContext.getAppParams().setIncrementResource( + oppContainerContext.getAppParams().getMinResource()); } - appParams.containerTokenExpiryInterval = registerResponse - .getContainerTokenExpiryInterval(); + oppContainerContext.getAppParams().setContainerTokenExpiryInterval( + registerResponse.getContainerTokenExpiryInterval()); - containerIdCounter + oppContainerContext.getContainerIdGenerator() .resetContainerIdCounter(registerResponse.getContainerIdStart()); setNodeList(registerResponse.getNodesForScheduling()); } - /** - * Takes a list of ResourceRequests (asks), extracts the key information viz. - * (Priority, ResourceName, Capability) and adds to the outstanding - * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce - * the current YARN constraint that only a single ResourceRequest can exist at - * a give Priority and Capability. - * - * @param resourceAsks the list with the {@link ResourceRequest}s - */ - public void addToOutstandingReqs(List resourceAsks) { - for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); - - // TODO: Extend for Node/Rack locality. We only handle ANY requests now - if (!ResourceRequest.isAnyLocation(request.getResourceName())) { - continue; - } - - if (request.getNumContainers() == 0) { - continue; - } - - Map reqMap = - this.outstandingOpReqs.get(priority); - if (reqMap == null) { - reqMap = new HashMap<>(); - this.outstandingOpReqs.put(priority, reqMap); - } - - ResourceRequest resourceRequest = reqMap.get(request.getCapability()); - if (resourceRequest == null) { - resourceRequest = request; - reqMap.put(request.getCapability(), request); - } else { - resourceRequest.setNumContainers( - resourceRequest.getNumContainers() + request.getNumContainers()); - } - if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority - + ", with capability = " + request.getCapability() + " ) : " - + resourceRequest.getNumContainers()); - } - } - } - - /** - * This method matches a returned list of Container Allocations to any - * outstanding OPPORTUNISTIC ResourceRequest. - */ - private void matchAllocationToOutstandingRequest(Resource capability, - List allocatedContainers) { - for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); - Map asks = - outstandingOpReqs.get(c.getPriority()); - - if (asks == null) - continue; - - ResourceRequest rr = asks.get(capability); - if (rr != null) { - rr.setNumContainers(rr.getNumContainers() - 1); - if (rr.getNumContainers() == 0) { - asks.remove(capability); - } - } - } - } - private void setNodeList(List nodeList) { - this.nodeList.clear(); + oppContainerContext.getNodeMap().clear(); addToNodeList(nodeList); } private void addToNodeList(List nodes) { for (NodeId n : nodes) { - this.nodeList.put(n.getHost(), n); + oppContainerContext.getNodeMap().put(n.getHost(), n); } } @@ -345,52 +229,13 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( LOG.debug("Forwarding allocate request to the" + "Distributed Scheduler Service on YARN RM"); } - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAllocateRequest().getAskList()); - - List releasedContainers = - request.getAllocateRequest().getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - containersAllocated.removeAll(releasedContainers); - } - - // Also, update black list - ResourceBlacklistRequest rbr = - request.getAllocateRequest().getResourceBlacklistRequest(); - if (rbr != null) { - blacklist.removeAll(rbr.getBlacklistRemovals()); - blacklist.addAll(rbr.getBlacklistAdditions()); - } - - // Add OPPORTUNISTIC reqs to the outstanding reqs - addToOutstandingReqs(partitionedAsks.getOpportunistic()); - - List allocatedContainers = new ArrayList<>(); - for (Priority priority : outstandingOpReqs.descendingKeySet()) { - // Allocated containers : - // Key = Requested Capability, - // Value = List of Containers of given Cap (The actual container size - // might be different than what is requested.. which is why - // we need the requested capability (key) to match against - // the outstanding reqs) - Map> allocated = - containerAllocator.allocate(this.appParams, containerIdCounter, - outstandingOpReqs.get(priority).values(), blacklist, - applicationAttemptId, nodeList, appSubmitter); - for (Map.Entry> e : allocated.entrySet()) { - matchAllocationToOutstandingRequest(e.getKey(), e.getValue()); - allocatedContainers.addAll(e.getValue()); - } - } + List allocatedContainers = + containerAllocator.allocateContainers( + request.getAllocateRequest(), applicationAttemptId, + oppContainerContext, rmIdentifier, appSubmitter); request.setAllocatedContainers(allocatedContainers); - // Send all the GUARANTEED Reqs to RM - request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); @@ -398,7 +243,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( setNodeList(dsResp.getNodesForScheduling()); List nmTokens = dsResp.getAllocateResponse().getNMTokens(); for (NMToken nmToken : nmTokens) { - nodeTokens.put(nmToken.getNodeId(), nmToken); + oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken); } List completedContainers = @@ -407,7 +252,8 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( // Only account for opportunistic containers for (ContainerStatus cs : completedContainers) { if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); + oppContainerContext.getContainersAllocated() + .remove(cs.getContainerId()); } } @@ -417,9 +263,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( dsResp.getAllocateResponse(), nmTokens, allocatedContainers); if (LOG.isDebugEnabled()) { - LOG.debug( - "Number of opportunistic containers currently allocated by" + - "application: " + containersAllocated.size()); + LOG.debug("Number of opportunistic containers currently" + + "allocated by application: " + oppContainerContext + .getContainersAllocated().size()); } return dsResp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java deleted file mode 100644 index 4723233..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.scheduler; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.api.ContainerType; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.net.InetSocketAddress; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; - -/** - *

- * The OpportunisticContainerAllocator allocates containers on a given list of - * nodes, after modifying the container sizes to respect the limits set by the - * ResourceManager. It tries to distribute the containers as evenly as possible. - * It also uses the NMTokenSecretManagerInNM to generate the - * required NM tokens for the allocated containers. - *

- */ -public class OpportunisticContainerAllocator { - - private static final Log LOG = - LogFactory.getLog(OpportunisticContainerAllocator.class); - - private static final ResourceCalculator RESOURCE_CALCULATOR = - new DominantResourceCalculator(); - - static class ContainerIdCounter { - final AtomicLong containerIdCounter = new AtomicLong(1); - - void resetContainerIdCounter(long containerIdStart) { - this.containerIdCounter.set(containerIdStart); - } - - long generateContainerId() { - return this.containerIdCounter.decrementAndGet(); - } - } - - private final NodeStatusUpdater nodeStatusUpdater; - private final Context context; - private int webpagePort; - - public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater, - Context context, int webpagePort) { - this.nodeStatusUpdater = nodeStatusUpdater; - this.context = context; - this.webpagePort = webpagePort; - } - - public Map> allocate( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Collection resourceAsks, Set blacklist, - ApplicationAttemptId appAttId, Map allNodes, - String userName) throws YarnException { - Map> containers = new HashMap<>(); - for (ResourceRequest anyAsk : resourceAsks) { - allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId, - allNodes, userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); - } - return containers; - } - - private void allocateOpportunisticContainers( - DistributedSchedulerParams appParams, ContainerIdCounter idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> containers, ResourceRequest anyAsk) - throws YarnException { - int toAllocate = anyAsk.getNumContainers() - - (containers.isEmpty() ? 0 : - containers.get(anyAsk.getCapability()).size()); - - List nodesForScheduling = new ArrayList<>(); - for (Entry nodeEntry : allNodes.entrySet()) { - // Do not use blacklisted nodes for scheduling. - if (blacklist.contains(nodeEntry.getKey())) { - continue; - } - nodesForScheduling.add(nodeEntry.getValue()); - } - int numAllocated = 0; - int nextNodeToSchedule = 0; - for (int numCont = 0; numCont < toAllocate; numCont++) { - nextNodeToSchedule++; - nextNodeToSchedule %= nodesForScheduling.size(); - NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule); - Container container = buildContainer(appParams, idCounter, anyAsk, id, - userName, nodeId); - List cList = containers.get(anyAsk.getCapability()); - if (cList == null) { - cList = new ArrayList<>(); - containers.put(anyAsk.getCapability(), cList); - } - cList.add(container); - numAllocated++; - LOG.info("Allocated [" + container.getId() + "] as opportunistic."); - } - LOG.info("Allocated " + numAllocated + " opportunistic containers."); - } - - private Container buildContainer(DistributedSchedulerParams appParams, - ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, - String userName, NodeId nodeId) throws YarnException { - ContainerId cId = - ContainerId.newContainerId(id, idCounter.generateContainerId()); - - // Normalize the resource asks (Similar to what the the RM scheduler does - // before accepting an ask) - Resource capability = normalizeCapability(appParams, rr); - - long currTime = System.currentTimeMillis(); - ContainerTokenIdentifier containerTokenIdentifier = - new ContainerTokenIdentifier( - cId, nodeId.getHost() + ":" + nodeId.getPort(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, - context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), - nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, - null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, - ExecutionType.OPPORTUNISTIC); - byte[] pwd = - context.getContainerTokenSecretManager().createPassword( - containerTokenIdentifier); - Token containerToken = newContainerToken(nodeId, pwd, - containerTokenIdentifier); - Container container = BuilderUtils.newContainer( - cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken, - containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); - return container; - } - - private Resource normalizeCapability(DistributedSchedulerParams appParams, - ResourceRequest ask) { - return Resources.normalize(RESOURCE_CALCULATOR, - ask.getCapability(), appParams.minResource, appParams.maxResource, - appParams.incrementResource); - } - - public static Token newContainerToken(NodeId nodeId, byte[] password, - ContainerTokenIdentifier tokenIdentifier) { - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), - nodeId.getPort()); - // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token - Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), - ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return containerToken; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index f716d44..d8660dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -67,7 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; -import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java index b093b3b..8f1ae7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java @@ -38,11 +38,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; + +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -189,7 +190,6 @@ private RequestInterceptor setup(Configuration conf, DistributedScheduler distributedScheduler) { NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class); Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l); - Context context = Mockito.mock(Context.class); NMContainerTokenSecretManager nmContainerTokenSecretManager = new NMContainerTokenSecretManager(conf); MasterKey mKey = new MasterKey() { @@ -207,15 +207,13 @@ public ByteBuffer getBytes() { public void setBytes(ByteBuffer bytes) {} }; nmContainerTokenSecretManager.setMasterKey(mKey); - Mockito.when(context.getContainerTokenSecretManager()).thenReturn - (nmContainerTokenSecretManager); OpportunisticContainerAllocator containerAllocator = - new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777); + new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77); NMTokenSecretManagerInNM nmTokenSecretManagerInNM = new NMTokenSecretManagerInNM(); nmTokenSecretManagerInNM.setMasterKey(mKey); - distributedScheduler.initLocal( + distributedScheduler.initLocal(1234, ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), containerAllocator, nmTokenSecretManagerInNM, "test"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java deleted file mode 100644 index 843ac09..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java +++ /dev/null @@ -1,361 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; -import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; - - -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; - -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; - - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The DistributedSchedulingAMService is started instead of the - * ApplicationMasterService if distributed scheduling is enabled for the YARN - * cluster. - * It extends the functionality of the ApplicationMasterService by servicing - * clients (AMs and AMRMProxy request interceptors) that understand the - * DistributedSchedulingProtocol. - */ -public class DistributedSchedulingAMService extends ApplicationMasterService - implements DistributedSchedulingAMProtocol, EventHandler { - - private static final Log LOG = - LogFactory.getLog(DistributedSchedulingAMService.class); - - private final NodeQueueLoadMonitor nodeMonitor; - - private final ConcurrentHashMap> rackToNode = - new ConcurrentHashMap<>(); - private final ConcurrentHashMap> hostToNode = - new ConcurrentHashMap<>(); - private final int k; - - public DistributedSchedulingAMService(RMContext rmContext, - YarnScheduler scheduler) { - super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler); - this.k = rmContext.getYarnConfiguration().getInt( - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED, - YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT); - long nodeSortInterval = rmContext.getYarnConfiguration().getLong( - YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); - NodeQueueLoadMonitor.LoadComparator comparator = - NodeQueueLoadMonitor.LoadComparator.valueOf( - rmContext.getYarnConfiguration().get( - YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, - YarnConfiguration. - NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); - - NodeQueueLoadMonitor topKSelector = - new NodeQueueLoadMonitor(nodeSortInterval, comparator); - - float sigma = rmContext.getYarnConfiguration() - .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, - YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); - - int limitMin, limitMax; - - if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { - limitMin = rmContext.getYarnConfiguration() - .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, - YarnConfiguration. - NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); - limitMax = rmContext.getYarnConfiguration() - .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, - YarnConfiguration. - NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); - } else { - limitMin = rmContext.getYarnConfiguration() - .getInt( - YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); - limitMax = rmContext.getYarnConfiguration() - .getInt( - YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, - YarnConfiguration. - NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); - } - - topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); - this.nodeMonitor = topKSelector; - } - - @Override - public Server getServer(YarnRPC rpc, Configuration serverConf, - InetSocketAddress addr, AMRMTokenSecretManager secretManager) { - Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, - addr, serverConf, secretManager, - serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running on NMs that DO NOT support - // Dist Scheduling... The server multiplexes both the - // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol - ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - ApplicationMasterProtocolPB.class, - ApplicationMasterProtocolService.newReflectiveBlockingService( - new ApplicationMasterProtocolPBServiceImpl(this))); - return server; - } - - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws YarnException, - IOException { - return super.registerApplicationMaster(request); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - return super.finishApplicationMaster(request); - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - return super.allocate(request); - } - - @Override - public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { - RegisterApplicationMasterResponse response = - registerApplicationMaster(request); - RegisterDistributedSchedulingAMResponse dsResp = recordFactory - .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); - dsResp.setRegisterResponse(response); - dsResp.setMinContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB, - YarnConfiguration. - DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setMaxContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB, - YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setIncrContainerResource( - Resource.newInstance( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB, - YarnConfiguration. - DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT), - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES, - YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT) - ) - ); - dsResp.setContainerTokenExpiryInterval( - getConfig().getInt( - YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, - YarnConfiguration. - DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); - dsResp.setContainerIdStart( - this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); - - // Set nodes to be used for scheduling - dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); - return dsResp; - } - - @Override - public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( - DistributedSchedulingAllocateRequest request) - throws YarnException, IOException { - List distAllocContainers = request.getAllocatedContainers(); - for (Container container : distAllocContainers) { - // Create RMContainer - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler) rmContext.getScheduler()) - .getCurrentAttemptForContainer(container.getId()); - RMContainer rmContainer = new RMContainerImpl(container, - appAttempt.getApplicationAttemptId(), container.getNodeId(), - appAttempt.getUser(), rmContext, true); - appAttempt.addRMContainer(container.getId(), rmContainer); - rmContainer.handle( - new RMContainerEvent(container.getId(), - RMContainerEventType.LAUNCHED)); - } - AllocateResponse response = allocate(request.getAllocateRequest()); - DistributedSchedulingAllocateResponse dsResp = recordFactory - .newRecordInstance(DistributedSchedulingAllocateResponse.class); - dsResp.setAllocateResponse(response); - dsResp.setNodesForScheduling( - this.nodeMonitor.selectLeastLoadedNodes(this.k)); - return dsResp; - } - - private void addToMapping(ConcurrentHashMap> mapping, - String rackName, NodeId nodeId) { - if (rackName != null) { - mapping.putIfAbsent(rackName, new HashSet()); - Set nodeIds = mapping.get(rackName); - synchronized (nodeIds) { - nodeIds.add(nodeId); - } - } - } - - private void removeFromMapping(ConcurrentHashMap> mapping, - String rackName, NodeId nodeId) { - if (rackName != null) { - Set nodeIds = mapping.get(rackName); - synchronized (nodeIds) { - nodeIds.remove(nodeId); - } - } - } - - @Override - public void handle(SchedulerEvent event) { - switch (event.getType()) { - case NODE_ADDED: - if (!(event instanceof NodeAddedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; - nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), - nodeAddedEvent.getAddedRMNode()); - addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), - nodeAddedEvent.getAddedRMNode().getNodeID()); - break; - case NODE_REMOVED: - if (!(event instanceof NodeRemovedSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeRemovedSchedulerEvent nodeRemovedEvent = - (NodeRemovedSchedulerEvent) event; - nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); - removeFromMapping(rackToNode, - nodeRemovedEvent.getRemovedRMNode().getRackName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - removeFromMapping(hostToNode, - nodeRemovedEvent.getRemovedRMNode().getHostName(), - nodeRemovedEvent.getRemovedRMNode().getNodeID()); - break; - case NODE_UPDATE: - if (!(event instanceof NodeUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) - event; - nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); - break; - case NODE_RESOURCE_UPDATE: - if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = - (NodeResourceUpdateSchedulerEvent) event; - nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), - nodeResourceUpdatedEvent.getResourceOption()); - break; - - // <-- IGNORED EVENTS : START --> - case APP_ADDED: - break; - case APP_REMOVED: - break; - case APP_ATTEMPT_ADDED: - break; - case APP_ATTEMPT_REMOVED: - break; - case CONTAINER_EXPIRED: - break; - case NODE_LABELS_UPDATE: - break; - // <-- IGNORED EVENTS : END --> - default: - LOG.error("Unknown event arrived at DistributedSchedulingAMService: " - + event.toString()); - } - - } - - public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return nodeMonitor.getThresholdCalculator(); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java new file mode 100644 index 0000000..5edef27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; +import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; + + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * The OpportunisticContainerAllocatingAMService is started instead of the + * ApplicationMasterService if distributed scheduling is enabled for the YARN + * cluster. + * It extends the functionality of the ApplicationMasterService by servicing + * clients (AMs and AMRMProxy request interceptors) that understand the + * DistributedSchedulingProtocol. + */ +public class OpportunisticContainerAllocatingAMService + extends ApplicationMasterService implements DistributedSchedulingAMProtocol, + EventHandler { + + private static final Log LOG = + LogFactory.getLog(OpportunisticContainerAllocatingAMService.class); + + private final NodeQueueLoadMonitor nodeMonitor; + + private final ConcurrentHashMap> rackToNode = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap> hostToNode = + new ConcurrentHashMap<>(); + private final int k; + + public OpportunisticContainerAllocatingAMService(RMContext rmContext, + YarnScheduler scheduler) { + super(OpportunisticContainerAllocatingAMService.class.getName(), + rmContext, scheduler); + this.k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, + YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT); + long nodeSortInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT); + NodeQueueLoadMonitor.LoadComparator comparator = + NodeQueueLoadMonitor.LoadComparator.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR, + YarnConfiguration. + NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT)); + + NodeQueueLoadMonitor topKSelector = + new NodeQueueLoadMonitor(nodeSortInterval, comparator); + + float sigma = rmContext.getYarnConfiguration() + .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV, + YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT); + + int limitMin, limitMax; + + if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) { + limitMin = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT); + } else { + limitMin = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT); + limitMax = rmContext.getYarnConfiguration() + .getInt( + YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS, + YarnConfiguration. + NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT); + } + + topKSelector.initThresholdCalculator(sigma, limitMin, limitMax); + this.nodeMonitor = topKSelector; + } + + @Override + public Server getServer(YarnRPC rpc, Configuration serverConf, + InetSocketAddress addr, AMRMTokenSecretManager secretManager) { + if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) { + Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this, + addr, serverConf, secretManager, + serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); + // To support application running on NMs that DO NOT support + // Dist Scheduling... The server multiplexes both the + // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol + ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + ApplicationMasterProtocolPB.class, + ApplicationMasterProtocolService.newReflectiveBlockingService( + new ApplicationMasterProtocolPBServiceImpl(this))); + return server; + } + return super.getServer(rpc, serverConf, addr, secretManager); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return super.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return super.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + return super.allocate(request); + } + + @Override + public RegisterDistributedSchedulingAMResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse response = + registerApplicationMaster(request); + RegisterDistributedSchedulingAMResponse dsResp = recordFactory + .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); + dsResp.setRegisterResponse(response); + dsResp.setMinContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT) + ) + ); + dsResp.setMaxContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB, + YarnConfiguration + .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT) + ) + ); + dsResp.setIncrContainerResource( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT), + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES, + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT) + ) + ); + dsResp.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS, + YarnConfiguration. + OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT)); + dsResp.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + + // Set nodes to be used for scheduling + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + @Override + public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( + DistributedSchedulingAllocateRequest request) + throws YarnException, IOException { + List distAllocContainers = request.getAllocatedContainers(); + for (Container container : distAllocContainers) { + // Create RMContainer + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, true); + appAttempt.addRMContainer(container.getId(), rmContainer); + rmContainer.handle( + new RMContainerEvent(container.getId(), + RMContainerEventType.LAUNCHED)); + } + AllocateResponse response = allocate(request.getAllocateRequest()); + DistributedSchedulingAllocateResponse dsResp = recordFactory + .newRecordInstance(DistributedSchedulingAllocateResponse.class); + dsResp.setAllocateResponse(response); + dsResp.setNodesForScheduling( + this.nodeMonitor.selectLeastLoadedNodes(this.k)); + return dsResp; + } + + private void addToMapping(ConcurrentHashMap> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + mapping.putIfAbsent(rackName, new HashSet()); + Set nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.add(nodeId); + } + } + } + + private void removeFromMapping(ConcurrentHashMap> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + Set nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.remove(nodeId); + } + } + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + nodeMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent) event; + nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent) + event; + nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent) event; + nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at" + + "OpportunisticContainerAllocatingAMService: " + event.toString()); + } + + } + + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return nodeMonitor.getThresholdCalculator(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 4509045..5789051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -116,7 +116,6 @@ import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; @@ -1177,24 +1176,27 @@ protected ClientRMService createClientRMService() { } protected ApplicationMasterService createApplicationMasterService() { - if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, - YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - DistributedSchedulingAMService distributedSchedulingService = new - DistributedSchedulingAMService(this.rmContext, scheduler); - EventDispatcher distSchedulerEventDispatcher = - new EventDispatcher(distributedSchedulingService, - DistributedSchedulingAMService.class.getName()); - // Add an event dispatcher for the DistributedSchedulingAMService - // to handle node updates/additions and removals. + Configuration config = this.rmContext.getYarnConfiguration(); + if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config) + || YarnConfiguration.isDistSchedulingEnabled(config)) { + OpportunisticContainerAllocatingAMService + oppContainerAllocatingAMService = + new OpportunisticContainerAllocatingAMService(this.rmContext, + scheduler); + EventDispatcher oppContainerAllocEventDispatcher = + new EventDispatcher(oppContainerAllocatingAMService, + OpportunisticContainerAllocatingAMService.class.getName()); + // Add an event dispatcher for the + // OpportunisticContainerAllocatingAMService to handle node + // updates/additions and removals. // Since the SchedulerEvent is currently a super set of theses, // we register interest for it.. - addService(distSchedulerEventDispatcher); + addService(oppContainerAllocEventDispatcher); rmDispatcher.register(SchedulerEventType.class, - distSchedulerEventDispatcher); + oppContainerAllocEventDispatcher); this.rmContext.setContainerQueueLimitCalculator( - distributedSchedulingService.getNodeManagerQueueLimitCalculator()); - return distributedSchedulingService; + oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator()); + return oppContainerAllocatingAMService; } return new ApplicationMasterService(this.rmContext, scheduler); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 7d1b3c3..329c432 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -820,9 +820,10 @@ protected void serviceStop() { @Override protected ApplicationMasterService createApplicationMasterService() { if (this.rmContext.getYarnConfiguration().getBoolean( - YarnConfiguration.DIST_SCHEDULING_ENABLED, - YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - return new DistributedSchedulingAMService(getRMContext(), scheduler) { + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, + YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) { + return new OpportunisticContainerAllocatingAMService(getRMContext(), + scheduler) { @Override protected void serviceStart() { // override to not start rpc handler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java deleted file mode 100644 index 0213a94..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; -import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; -import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.List; - -/** - * Test cases for {@link DistributedSchedulingAMService}. - */ -public class TestDistributedSchedulingAMService { - - // Test if the DistributedSchedulingAMService can handle both DSProtocol as - // well as AMProtocol clients - @Test - public void testRPCWrapping() throws Exception { - Configuration conf = new Configuration(); - conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class - .getName()); - YarnRPC rpc = YarnRPC.create(conf); - String bindAddr = "localhost:0"; - InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); - conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr); - final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); - final RMContext rmContext = new RMContextImpl() { - @Override - public AMLivelinessMonitor getAMLivelinessMonitor() { - return null; - } - - @Override - public Configuration getYarnConfiguration() { - return new YarnConfiguration(); - } - }; - Container c = factory.newRecordInstance(Container.class); - c.setExecutionType(ExecutionType.OPPORTUNISTIC); - c.setId( - ContainerId.newContainerId( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(12345, 1), 2), 3)); - AllocateRequest allReq = - (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); - allReq.setAskList(Arrays.asList( - ResourceRequest.newInstance(Priority.UNDEFINED, "a", - Resource.newInstance(1, 2), 1, true, "exp", - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)))); - DistributedSchedulingAMService service = - createService(factory, rmContext, c); - Server server = service.getServer(rpc, conf, addr, null); - server.start(); - - // Verify that the DistrubutedSchedulingService can handle vanilla - // ApplicationMasterProtocol clients - RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, - ProtobufRpcEngine.class); - ApplicationMasterProtocolPB ampProxy = - RPC.getProxy(ApplicationMasterProtocolPB - .class, 1, NetUtils.getConnectAddress(server), conf); - RegisterApplicationMasterResponse regResp = - new RegisterApplicationMasterResponsePBImpl( - ampProxy.registerApplicationMaster(null, - ((RegisterApplicationMasterRequestPBImpl)factory - .newRecordInstance( - RegisterApplicationMasterRequest.class)).getProto())); - Assert.assertEquals("dummyQueue", regResp.getQueue()); - FinishApplicationMasterResponse finishResp = - new FinishApplicationMasterResponsePBImpl( - ampProxy.finishApplicationMaster(null, - ((FinishApplicationMasterRequestPBImpl)factory - .newRecordInstance( - FinishApplicationMasterRequest.class)).getProto() - )); - Assert.assertEquals(false, finishResp.getIsUnregistered()); - AllocateResponse allocResp = - new AllocateResponsePBImpl( - ampProxy.allocate(null, - ((AllocateRequestPBImpl)factory - .newRecordInstance(AllocateRequest.class)).getProto()) - ); - List allocatedContainers = allocResp.getAllocatedContainers(); - Assert.assertEquals(1, allocatedContainers.size()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - allocatedContainers.get(0).getExecutionType()); - Assert.assertEquals(12345, allocResp.getNumClusterNodes()); - - - // Verify that the DistrubutedSchedulingService can handle the - // DistributedSchedulingAMProtocol clients as well - RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class, - ProtobufRpcEngine.class); - DistributedSchedulingAMProtocolPB dsProxy = - RPC.getProxy(DistributedSchedulingAMProtocolPB - .class, 1, NetUtils.getConnectAddress(server), conf); - - RegisterDistributedSchedulingAMResponse dsRegResp = - new RegisterDistributedSchedulingAMResponsePBImpl( - dsProxy.registerApplicationMasterForDistributedScheduling(null, - ((RegisterApplicationMasterRequestPBImpl)factory - .newRecordInstance(RegisterApplicationMasterRequest.class)) - .getProto())); - Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); - Assert.assertEquals(4, - dsRegResp.getMaxContainerResource().getVirtualCores()); - Assert.assertEquals(1024, - dsRegResp.getMinContainerResource().getMemorySize()); - Assert.assertEquals(2, - dsRegResp.getIncrContainerResource().getVirtualCores()); - - DistributedSchedulingAllocateRequestPBImpl distAllReq = - (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance( - DistributedSchedulingAllocateRequest.class); - distAllReq.setAllocateRequest(allReq); - distAllReq.setAllocatedContainers(Arrays.asList(c)); - DistributedSchedulingAllocateResponse dsAllocResp = - new DistributedSchedulingAllocateResponsePBImpl( - dsProxy.allocateForDistributedScheduling(null, - distAllReq.getProto())); - Assert.assertEquals( - "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); - - FinishApplicationMasterResponse dsfinishResp = - new FinishApplicationMasterResponsePBImpl( - dsProxy.finishApplicationMaster(null, - ((FinishApplicationMasterRequestPBImpl) factory - .newRecordInstance(FinishApplicationMasterRequest.class)) - .getProto())); - Assert.assertEquals( - false, dsfinishResp.getIsUnregistered()); - } - - private DistributedSchedulingAMService createService(final RecordFactory - factory, final RMContext rmContext, final Container c) { - return new DistributedSchedulingAMService(rmContext, null) { - @Override - public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws - YarnException, IOException { - RegisterApplicationMasterResponse resp = factory.newRecordInstance( - RegisterApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setQueue("dummyQueue"); - return resp; - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { - FinishApplicationMasterResponse resp = factory.newRecordInstance( - FinishApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setIsUnregistered(false); - return resp; - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - AllocateResponse response = factory.newRecordInstance( - AllocateResponse.class); - response.setNumClusterNodes(12345); - response.setAllocatedContainers(Arrays.asList(c)); - return response; - } - - @Override - public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( - RegisterApplicationMasterRequest request) - throws YarnException, IOException { - RegisterDistributedSchedulingAMResponse resp = factory - .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); - resp.setContainerIdStart(54321L); - resp.setMaxContainerResource(Resource.newInstance(4096, 4)); - resp.setMinContainerResource(Resource.newInstance(1024, 1)); - resp.setIncrContainerResource(Resource.newInstance(2048, 2)); - return resp; - } - - @Override - public DistributedSchedulingAllocateResponse - allocateForDistributedScheduling( - DistributedSchedulingAllocateRequest request) - throws YarnException, IOException { - List askList = - request.getAllocateRequest().getAskList(); - List allocatedContainers = request.getAllocatedContainers(); - Assert.assertEquals(1, allocatedContainers.size()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - allocatedContainers.get(0).getExecutionType()); - Assert.assertEquals(1, askList.size()); - Assert.assertTrue(askList.get(0) - .getExecutionTypeRequest().getEnforceExecutionType()); - DistributedSchedulingAllocateResponse resp = factory - .newRecordInstance(DistributedSchedulingAllocateResponse.class); - resp.setNodesForScheduling( - Arrays.asList(NodeId.newInstance("h1", 1234))); - return resp; - } - }; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java new file mode 100644 index 0000000..f15bf75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; + +/** + * Test cases for {@link OpportunisticContainerAllocatingAMService}. + */ +public class TestOpportunisticContainerAllocatingAMService { + + // Test if the OpportunisticContainerAllocatingAMService can handle both + // DSProtocol as well as AMProtocol clients + @Test + public void testRPCWrapping() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class + .getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr); + final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null); + final RMContext rmContext = new RMContextImpl() { + @Override + public AMLivelinessMonitor getAMLivelinessMonitor() { + return null; + } + + @Override + public Configuration getYarnConfiguration() { + return new YarnConfiguration(); + } + }; + Container c = factory.newRecordInstance(Container.class); + c.setExecutionType(ExecutionType.OPPORTUNISTIC); + c.setId( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(12345, 1), 2), 3)); + AllocateRequest allReq = + (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); + allReq.setAskList(Arrays.asList( + ResourceRequest.newInstance(Priority.UNDEFINED, "a", + Resource.newInstance(1, 2), 1, true, "exp", + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)))); + OpportunisticContainerAllocatingAMService service = + createService(factory, rmContext, c); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + Server server = service.getServer(rpc, conf, addr, null); + server.start(); + + // Verify that the DistrubutedSchedulingService can handle vanilla + // ApplicationMasterProtocol clients + RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, + ProtobufRpcEngine.class); + ApplicationMasterProtocolPB ampProxy = + RPC.getProxy(ApplicationMasterProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); + RegisterApplicationMasterResponse regResp = + new RegisterApplicationMasterResponsePBImpl( + ampProxy.registerApplicationMaster(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance( + RegisterApplicationMasterRequest.class)).getProto())); + Assert.assertEquals("dummyQueue", regResp.getQueue()); + FinishApplicationMasterResponse finishResp = + new FinishApplicationMasterResponsePBImpl( + ampProxy.finishApplicationMaster(null, + ((FinishApplicationMasterRequestPBImpl)factory + .newRecordInstance( + FinishApplicationMasterRequest.class)).getProto() + )); + Assert.assertEquals(false, finishResp.getIsUnregistered()); + AllocateResponse allocResp = + new AllocateResponsePBImpl( + ampProxy.allocate(null, + ((AllocateRequestPBImpl)factory + .newRecordInstance(AllocateRequest.class)).getProto()) + ); + List allocatedContainers = allocResp.getAllocatedContainers(); + Assert.assertEquals(1, allocatedContainers.size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + allocatedContainers.get(0).getExecutionType()); + Assert.assertEquals(12345, allocResp.getNumClusterNodes()); + + + // Verify that the DistrubutedSchedulingService can handle the + // DistributedSchedulingAMProtocol clients as well + RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class, + ProtobufRpcEngine.class); + DistributedSchedulingAMProtocolPB dsProxy = + RPC.getProxy(DistributedSchedulingAMProtocolPB + .class, 1, NetUtils.getConnectAddress(server), conf); + + RegisterDistributedSchedulingAMResponse dsRegResp = + new RegisterDistributedSchedulingAMResponsePBImpl( + dsProxy.registerApplicationMasterForDistributedScheduling(null, + ((RegisterApplicationMasterRequestPBImpl)factory + .newRecordInstance(RegisterApplicationMasterRequest.class)) + .getProto())); + Assert.assertEquals(54321l, dsRegResp.getContainerIdStart()); + Assert.assertEquals(4, + dsRegResp.getMaxContainerResource().getVirtualCores()); + Assert.assertEquals(1024, + dsRegResp.getMinContainerResource().getMemorySize()); + Assert.assertEquals(2, + dsRegResp.getIncrContainerResource().getVirtualCores()); + + DistributedSchedulingAllocateRequestPBImpl distAllReq = + (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance( + DistributedSchedulingAllocateRequest.class); + distAllReq.setAllocateRequest(allReq); + distAllReq.setAllocatedContainers(Arrays.asList(c)); + DistributedSchedulingAllocateResponse dsAllocResp = + new DistributedSchedulingAllocateResponsePBImpl( + dsProxy.allocateForDistributedScheduling(null, + distAllReq.getProto())); + Assert.assertEquals( + "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); + + FinishApplicationMasterResponse dsfinishResp = + new FinishApplicationMasterResponsePBImpl( + dsProxy.finishApplicationMaster(null, + ((FinishApplicationMasterRequestPBImpl) factory + .newRecordInstance(FinishApplicationMasterRequest.class)) + .getProto())); + Assert.assertEquals( + false, dsfinishResp.getIsUnregistered()); + } + + private OpportunisticContainerAllocatingAMService createService( + final RecordFactory factory, final RMContext rmContext, + final Container c) { + return new OpportunisticContainerAllocatingAMService(rmContext, null) { + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws + YarnException, IOException { + RegisterApplicationMasterResponse resp = factory.newRecordInstance( + RegisterApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setQueue("dummyQueue"); + return resp; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + FinishApplicationMasterResponse resp = factory.newRecordInstance( + FinishApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setIsUnregistered(false); + return resp; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + AllocateResponse response = factory.newRecordInstance( + AllocateResponse.class); + response.setNumClusterNodes(12345); + response.setAllocatedContainers(Arrays.asList(c)); + return response; + } + + @Override + public RegisterDistributedSchedulingAMResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + RegisterDistributedSchedulingAMResponse resp = factory + .newRecordInstance(RegisterDistributedSchedulingAMResponse.class); + resp.setContainerIdStart(54321L); + resp.setMaxContainerResource(Resource.newInstance(4096, 4)); + resp.setMinContainerResource(Resource.newInstance(1024, 1)); + resp.setIncrContainerResource(Resource.newInstance(2048, 2)); + return resp; + } + + @Override + public DistributedSchedulingAllocateResponse + allocateForDistributedScheduling( + DistributedSchedulingAllocateRequest request) + throws YarnException, IOException { + List askList = + request.getAllocateRequest().getAskList(); + List allocatedContainers = request.getAllocatedContainers(); + Assert.assertEquals(1, allocatedContainers.size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + allocatedContainers.get(0).getExecutionType()); + Assert.assertEquals(1, askList.size()); + Assert.assertTrue(askList.get(0) + .getExecutionTypeRequest().getEnforceExecutionType()); + DistributedSchedulingAllocateResponse resp = factory + .newRecordInstance(DistributedSchedulingAllocateResponse.class); + resp.setNodesForScheduling( + Arrays.asList(NodeId.newInstance("h1", 1234))); + return resp; + } + }; + } +}