() {
+ @Override
+ public ApplicationMasterProtocol run() throws Exception {
+ setAMRMTokenService(conf);
+ return ClientRMProxy.createRMProxy(conf,
+ ApplicationMasterProtocol.class);
+ }
+ });
+ }
+ }
+
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
final RegisterApplicationMasterRequest request)
@@ -127,9 +145,15 @@ public AllocateResponse allocate(final AllocateRequest request)
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
- "request to the real YARN RM");
- return rmClient.registerApplicationMasterForDistributedScheduling(request);
+ if (getApplicationContext().getNMCotext()
+ .isDistributedSchedulingEnabled()) {
+ LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+ "request to the real YARN RM");
+ return ((DistributedSchedulingAMProtocol)rmClient)
+ .registerApplicationMasterForDistributedScheduling(request);
+ } else {
+ throw new YarnException("Distributed Scheduling is not enabled !!");
+ }
}
@Override
@@ -140,13 +164,18 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
}
- DistributedSchedulingAllocateResponse allocateResponse =
- rmClient.allocateForDistributedScheduling(request);
- if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
- updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+ if (getApplicationContext().getNMCotext()
+ .isDistributedSchedulingEnabled()) {
+ DistributedSchedulingAllocateResponse allocateResponse =
+ ((DistributedSchedulingAMProtocol)rmClient)
+ .allocateForDistributedScheduling(request);
+ if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+ }
+ return allocateResponse;
+ } else {
+ throw new YarnException("Distributed Scheduling is not enabled !!");
}
-
- return allocateResponse;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index bfb12ee..368858c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -32,34 +32,23 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
-
-
-
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
/**
* The DistributedScheduler runs on the NodeManager and is modeled as an
@@ -76,74 +65,49 @@
*/
public final class DistributedScheduler extends AbstractRequestInterceptor {
- static class PartitionedResourceRequests {
- private List guaranteed = new ArrayList<>();
- private List opportunistic = new ArrayList<>();
- public List getGuaranteed() {
- return guaranteed;
- }
- public List getOpportunistic() {
- return opportunistic;
- }
- }
-
- static class DistributedSchedulerParams {
- Resource maxResource;
- Resource minResource;
- Resource incrementResource;
- int containerTokenExpiryInterval;
- }
-
private static final Logger LOG = LoggerFactory
.getLogger(DistributedScheduler.class);
private final static RecordFactory RECORD_FACTORY =
RecordFactoryProvider.getRecordFactory(null);
- // Currently just used to keep track of allocated containers.
- // Can be used for reporting stats later.
- private Set containersAllocated = new HashSet<>();
-
- private DistributedSchedulerParams appParams =
- new DistributedSchedulerParams();
- private final OpportunisticContainerAllocator.ContainerIdCounter
- containerIdCounter =
- new OpportunisticContainerAllocator.ContainerIdCounter();
- private Map nodeList = new LinkedHashMap<>();
-
- // Mapping of NodeId to NodeTokens. Populated either from RM response or
- // generated locally if required.
- private Map nodeTokens = new HashMap<>();
- final Set blacklist = new HashSet<>();
-
- // This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
- // Resource Name (Host/rack/any) and capability. This mapping is required
- // to match a received Container to an outstanding OPPORTUNISTIC
- // ResourceRequest (ask).
- final TreeMap>
- outstandingOpReqs = new TreeMap<>();
+ private OpportunisticContainerContext oppContainerContext =
+ new OpportunisticContainerContext();
private ApplicationAttemptId applicationAttemptId;
private OpportunisticContainerAllocator containerAllocator;
private NMTokenSecretManagerInNM nmSecretManager;
private String appSubmitter;
-
- public void init(AMRMProxyApplicationContext appContext) {
- super.init(appContext);
- initLocal(appContext.getApplicationAttemptId(),
- appContext.getNMCotext().getContainerAllocator(),
- appContext.getNMCotext().getNMTokenSecretManager(),
- appContext.getUser());
+ private long rmIdentifier;
+
+ public void init(AMRMProxyApplicationContext applicationContext) {
+ super.init(applicationContext);
+ initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
+ .getRMIdentifier(),
+ applicationContext.getApplicationAttemptId(),
+ applicationContext.getNMCotext().getContainerAllocator(),
+ applicationContext.getNMCotext().getNMTokenSecretManager(),
+ applicationContext.getUser());
}
@VisibleForTesting
- void initLocal(ApplicationAttemptId applicationAttemptId,
- OpportunisticContainerAllocator containerAllocator,
+ void initLocal(long rmId, ApplicationAttemptId appAttemptId,
+ OpportunisticContainerAllocator oppContainerAllocator,
NMTokenSecretManagerInNM nmSecretManager, String appSubmitter) {
- this.applicationAttemptId = applicationAttemptId;
- this.containerAllocator = containerAllocator;
+ this.rmIdentifier = rmId;
+ this.applicationAttemptId = appAttemptId;
+ this.containerAllocator = oppContainerAllocator;
this.nmSecretManager = nmSecretManager;
this.appSubmitter = appSubmitter;
+
+ // Overrides the Generator to decrement container id.
+ this.oppContainerContext.setContainerIdGenerator(
+ new OpportunisticContainerAllocator.ContainerIdGenerator() {
+ @Override
+ public long generateContainerId() {
+ return this.containerIdCounter.decrementAndGet();
+ }
+ });
}
/**
@@ -202,7 +166,8 @@ private void updateResponseWithNMTokens(AllocateResponse response,
if (allocatedContainers.size() > 0) {
response.getAllocatedContainers().addAll(allocatedContainers);
for (Container alloc : allocatedContainers) {
- if (!nodeTokens.containsKey(alloc.getNodeId())) {
+ if (!oppContainerContext.getNodeTokens().containsKey(
+ alloc.getNodeId())) {
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
}
}
@@ -212,115 +177,34 @@ private void updateResponseWithNMTokens(AllocateResponse response,
}
}
- private PartitionedResourceRequests partitionAskList(List
- askList) {
- PartitionedResourceRequests partitionedRequests =
- new PartitionedResourceRequests();
- for (ResourceRequest rr : askList) {
- if (rr.getExecutionTypeRequest().getExecutionType() ==
- ExecutionType.OPPORTUNISTIC) {
- partitionedRequests.getOpportunistic().add(rr);
- } else {
- partitionedRequests.getGuaranteed().add(rr);
- }
- }
- return partitionedRequests;
- }
-
private void updateParameters(
RegisterDistributedSchedulingAMResponse registerResponse) {
- appParams.minResource = registerResponse.getMinContainerResource();
- appParams.maxResource = registerResponse.getMaxContainerResource();
- appParams.incrementResource =
- registerResponse.getIncrContainerResource();
- if (appParams.incrementResource == null) {
- appParams.incrementResource = appParams.minResource;
+ oppContainerContext.getAppParams().setMinResource(
+ registerResponse.getMinContainerResource());
+ oppContainerContext.getAppParams().setMaxResource(
+ registerResponse.getMaxContainerResource());
+ oppContainerContext.getAppParams().setIncrementResource(
+ registerResponse.getIncrContainerResource());
+ if (oppContainerContext.getAppParams().getIncrementResource() == null) {
+ oppContainerContext.getAppParams().setIncrementResource(
+ oppContainerContext.getAppParams().getMinResource());
}
- appParams.containerTokenExpiryInterval = registerResponse
- .getContainerTokenExpiryInterval();
+ oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
+ registerResponse.getContainerTokenExpiryInterval());
- containerIdCounter
+ oppContainerContext.getContainerIdGenerator()
.resetContainerIdCounter(registerResponse.getContainerIdStart());
setNodeList(registerResponse.getNodesForScheduling());
}
- /**
- * Takes a list of ResourceRequests (asks), extracts the key information viz.
- * (Priority, ResourceName, Capability) and adds to the outstanding
- * OPPORTUNISTIC outstandingOpReqs map. The nested map is required to enforce
- * the current YARN constraint that only a single ResourceRequest can exist at
- * a give Priority and Capability.
- *
- * @param resourceAsks the list with the {@link ResourceRequest}s
- */
- public void addToOutstandingReqs(List resourceAsks) {
- for (ResourceRequest request : resourceAsks) {
- Priority priority = request.getPriority();
-
- // TODO: Extend for Node/Rack locality. We only handle ANY requests now
- if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
- continue;
- }
-
- if (request.getNumContainers() == 0) {
- continue;
- }
-
- Map reqMap =
- this.outstandingOpReqs.get(priority);
- if (reqMap == null) {
- reqMap = new HashMap<>();
- this.outstandingOpReqs.put(priority, reqMap);
- }
-
- ResourceRequest resourceRequest = reqMap.get(request.getCapability());
- if (resourceRequest == null) {
- resourceRequest = request;
- reqMap.put(request.getCapability(), request);
- } else {
- resourceRequest.setNumContainers(
- resourceRequest.getNumContainers() + request.getNumContainers());
- }
- if (ResourceRequest.isAnyLocation(request.getResourceName())) {
- LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
- + ", with capability = " + request.getCapability() + " ) : "
- + resourceRequest.getNumContainers());
- }
- }
- }
-
- /**
- * This method matches a returned list of Container Allocations to any
- * outstanding OPPORTUNISTIC ResourceRequest.
- */
- private void matchAllocationToOutstandingRequest(Resource capability,
- List allocatedContainers) {
- for (Container c : allocatedContainers) {
- containersAllocated.add(c.getId());
- Map asks =
- outstandingOpReqs.get(c.getPriority());
-
- if (asks == null)
- continue;
-
- ResourceRequest rr = asks.get(capability);
- if (rr != null) {
- rr.setNumContainers(rr.getNumContainers() - 1);
- if (rr.getNumContainers() == 0) {
- asks.remove(capability);
- }
- }
- }
- }
-
private void setNodeList(List nodeList) {
- this.nodeList.clear();
+ oppContainerContext.getNodeMap().clear();
addToNodeList(nodeList);
}
private void addToNodeList(List nodes) {
for (NodeId n : nodes) {
- this.nodeList.put(n.getHost(), n);
+ oppContainerContext.getNodeMap().put(n.getHost(), n);
}
}
@@ -345,52 +229,13 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
- // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
- PartitionedResourceRequests partitionedAsks =
- partitionAskList(request.getAllocateRequest().getAskList());
-
- List releasedContainers =
- request.getAllocateRequest().getReleaseList();
- int numReleasedContainers = releasedContainers.size();
- if (numReleasedContainers > 0) {
- LOG.info("AttemptID: " + applicationAttemptId + " released: "
- + numReleasedContainers);
- containersAllocated.removeAll(releasedContainers);
- }
-
- // Also, update black list
- ResourceBlacklistRequest rbr =
- request.getAllocateRequest().getResourceBlacklistRequest();
- if (rbr != null) {
- blacklist.removeAll(rbr.getBlacklistRemovals());
- blacklist.addAll(rbr.getBlacklistAdditions());
- }
-
- // Add OPPORTUNISTIC reqs to the outstanding reqs
- addToOutstandingReqs(partitionedAsks.getOpportunistic());
-
- List allocatedContainers = new ArrayList<>();
- for (Priority priority : outstandingOpReqs.descendingKeySet()) {
- // Allocated containers :
- // Key = Requested Capability,
- // Value = List of Containers of given Cap (The actual container size
- // might be different than what is requested.. which is why
- // we need the requested capability (key) to match against
- // the outstanding reqs)
- Map> allocated =
- containerAllocator.allocate(this.appParams, containerIdCounter,
- outstandingOpReqs.get(priority).values(), blacklist,
- applicationAttemptId, nodeList, appSubmitter);
- for (Map.Entry> e : allocated.entrySet()) {
- matchAllocationToOutstandingRequest(e.getKey(), e.getValue());
- allocatedContainers.addAll(e.getValue());
- }
- }
+ List allocatedContainers =
+ containerAllocator.allocateContainers(
+ request.getAllocateRequest(), applicationAttemptId,
+ oppContainerContext, rmIdentifier, appSubmitter);
request.setAllocatedContainers(allocatedContainers);
- // Send all the GUARANTEED Reqs to RM
- request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);
@@ -398,7 +243,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
setNodeList(dsResp.getNodesForScheduling());
List nmTokens = dsResp.getAllocateResponse().getNMTokens();
for (NMToken nmToken : nmTokens) {
- nodeTokens.put(nmToken.getNodeId(), nmToken);
+ oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
}
List completedContainers =
@@ -407,7 +252,8 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
// Only account for opportunistic containers
for (ContainerStatus cs : completedContainers) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- containersAllocated.remove(cs.getContainerId());
+ oppContainerContext.getContainersAllocated()
+ .remove(cs.getContainerId());
}
}
@@ -417,9 +263,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Number of opportunistic containers currently allocated by" +
- "application: " + containersAllocated.size());
+ LOG.debug("Number of opportunistic containers currently" +
+ "allocated by application: " + oppContainerContext
+ .getContainersAllocated().size());
}
return dsResp;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
deleted file mode 100644
index 4723233..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.scheduler;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.ContainerType;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler.DistributedSchedulerParams;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- * The OpportunisticContainerAllocator allocates containers on a given list of
- * nodes, after modifying the container sizes to respect the limits set by the
- * ResourceManager. It tries to distribute the containers as evenly as possible.
- * It also uses the NMTokenSecretManagerInNM to generate the
- * required NM tokens for the allocated containers.
- *
- */
-public class OpportunisticContainerAllocator {
-
- private static final Log LOG =
- LogFactory.getLog(OpportunisticContainerAllocator.class);
-
- private static final ResourceCalculator RESOURCE_CALCULATOR =
- new DominantResourceCalculator();
-
- static class ContainerIdCounter {
- final AtomicLong containerIdCounter = new AtomicLong(1);
-
- void resetContainerIdCounter(long containerIdStart) {
- this.containerIdCounter.set(containerIdStart);
- }
-
- long generateContainerId() {
- return this.containerIdCounter.decrementAndGet();
- }
- }
-
- private final NodeStatusUpdater nodeStatusUpdater;
- private final Context context;
- private int webpagePort;
-
- public OpportunisticContainerAllocator(NodeStatusUpdater nodeStatusUpdater,
- Context context, int webpagePort) {
- this.nodeStatusUpdater = nodeStatusUpdater;
- this.context = context;
- this.webpagePort = webpagePort;
- }
-
- public Map> allocate(
- DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
- Collection resourceAsks, Set blacklist,
- ApplicationAttemptId appAttId, Map allNodes,
- String userName) throws YarnException {
- Map> containers = new HashMap<>();
- for (ResourceRequest anyAsk : resourceAsks) {
- allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
- allNodes, userName, containers, anyAsk);
- LOG.info("Opportunistic allocation requested for ["
- + "priority=" + anyAsk.getPriority()
- + ", num_containers=" + anyAsk.getNumContainers()
- + ", capability=" + anyAsk.getCapability() + "]"
- + " allocated = " + containers.get(anyAsk.getCapability()).size());
- }
- return containers;
- }
-
- private void allocateOpportunisticContainers(
- DistributedSchedulerParams appParams, ContainerIdCounter idCounter,
- Set blacklist, ApplicationAttemptId id,
- Map allNodes, String userName,
- Map> containers, ResourceRequest anyAsk)
- throws YarnException {
- int toAllocate = anyAsk.getNumContainers()
- - (containers.isEmpty() ? 0 :
- containers.get(anyAsk.getCapability()).size());
-
- List nodesForScheduling = new ArrayList<>();
- for (Entry nodeEntry : allNodes.entrySet()) {
- // Do not use blacklisted nodes for scheduling.
- if (blacklist.contains(nodeEntry.getKey())) {
- continue;
- }
- nodesForScheduling.add(nodeEntry.getValue());
- }
- int numAllocated = 0;
- int nextNodeToSchedule = 0;
- for (int numCont = 0; numCont < toAllocate; numCont++) {
- nextNodeToSchedule++;
- nextNodeToSchedule %= nodesForScheduling.size();
- NodeId nodeId = nodesForScheduling.get(nextNodeToSchedule);
- Container container = buildContainer(appParams, idCounter, anyAsk, id,
- userName, nodeId);
- List cList = containers.get(anyAsk.getCapability());
- if (cList == null) {
- cList = new ArrayList<>();
- containers.put(anyAsk.getCapability(), cList);
- }
- cList.add(container);
- numAllocated++;
- LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
- }
- LOG.info("Allocated " + numAllocated + " opportunistic containers.");
- }
-
- private Container buildContainer(DistributedSchedulerParams appParams,
- ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id,
- String userName, NodeId nodeId) throws YarnException {
- ContainerId cId =
- ContainerId.newContainerId(id, idCounter.generateContainerId());
-
- // Normalize the resource asks (Similar to what the the RM scheduler does
- // before accepting an ask)
- Resource capability = normalizeCapability(appParams, rr);
-
- long currTime = System.currentTimeMillis();
- ContainerTokenIdentifier containerTokenIdentifier =
- new ContainerTokenIdentifier(
- cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
- capability, currTime + appParams.containerTokenExpiryInterval,
- context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
- nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
- null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
- ExecutionType.OPPORTUNISTIC);
- byte[] pwd =
- context.getContainerTokenSecretManager().createPassword(
- containerTokenIdentifier);
- Token containerToken = newContainerToken(nodeId, pwd,
- containerTokenIdentifier);
- Container container = BuilderUtils.newContainer(
- cId, nodeId, nodeId.getHost() + ":" + webpagePort,
- capability, rr.getPriority(), containerToken,
- containerTokenIdentifier.getExecutionType(),
- rr.getAllocationRequestId());
- return container;
- }
-
- private Resource normalizeCapability(DistributedSchedulerParams appParams,
- ResourceRequest ask) {
- return Resources.normalize(RESOURCE_CALCULATOR,
- ask.getCapability(), appParams.minResource, appParams.maxResource,
- appParams.incrementResource);
- }
-
- public static Token newContainerToken(NodeId nodeId, byte[] password,
- ContainerTokenIdentifier tokenIdentifier) {
- // RPC layer client expects ip:port as service for tokens
- InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(),
- nodeId.getPort());
- // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
- Token containerToken = Token.newInstance(tokenIdentifier.getBytes(),
- ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
- .buildTokenService(addr).toString());
- return containerToken;
- }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index f716d44..d8660dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -67,7 +67,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
-import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
index b093b3b..8f1ae7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestDistributedScheduler.java
@@ -38,11 +38,12 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@@ -189,7 +190,6 @@ private RequestInterceptor setup(Configuration conf,
DistributedScheduler distributedScheduler) {
NodeStatusUpdater nodeStatusUpdater = Mockito.mock(NodeStatusUpdater.class);
Mockito.when(nodeStatusUpdater.getRMIdentifier()).thenReturn(12345l);
- Context context = Mockito.mock(Context.class);
NMContainerTokenSecretManager nmContainerTokenSecretManager = new
NMContainerTokenSecretManager(conf);
MasterKey mKey = new MasterKey() {
@@ -207,15 +207,13 @@ public ByteBuffer getBytes() {
public void setBytes(ByteBuffer bytes) {}
};
nmContainerTokenSecretManager.setMasterKey(mKey);
- Mockito.when(context.getContainerTokenSecretManager()).thenReturn
- (nmContainerTokenSecretManager);
OpportunisticContainerAllocator containerAllocator =
- new OpportunisticContainerAllocator(nodeStatusUpdater, context, 7777);
+ new OpportunisticContainerAllocator(nmContainerTokenSecretManager, 77);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
new NMTokenSecretManagerInNM();
nmTokenSecretManagerInNM.setMasterKey(mKey);
- distributedScheduler.initLocal(
+ distributedScheduler.initLocal(1234,
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
containerAllocator, nmTokenSecretManagerInNM, "test");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
deleted file mode 100644
index 843ac09..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingAMService.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
-
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The DistributedSchedulingAMService is started instead of the
- * ApplicationMasterService if distributed scheduling is enabled for the YARN
- * cluster.
- * It extends the functionality of the ApplicationMasterService by servicing
- * clients (AMs and AMRMProxy request interceptors) that understand the
- * DistributedSchedulingProtocol.
- */
-public class DistributedSchedulingAMService extends ApplicationMasterService
- implements DistributedSchedulingAMProtocol, EventHandler {
-
- private static final Log LOG =
- LogFactory.getLog(DistributedSchedulingAMService.class);
-
- private final NodeQueueLoadMonitor nodeMonitor;
-
- private final ConcurrentHashMap> rackToNode =
- new ConcurrentHashMap<>();
- private final ConcurrentHashMap> hostToNode =
- new ConcurrentHashMap<>();
- private final int k;
-
- public DistributedSchedulingAMService(RMContext rmContext,
- YarnScheduler scheduler) {
- super(DistributedSchedulingAMService.class.getName(), rmContext, scheduler);
- this.k = rmContext.getYarnConfiguration().getInt(
- YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED,
- YarnConfiguration.DIST_SCHEDULING_NODES_NUMBER_USED_DEFAULT);
- long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
- YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
- NodeQueueLoadMonitor.LoadComparator comparator =
- NodeQueueLoadMonitor.LoadComparator.valueOf(
- rmContext.getYarnConfiguration().get(
- YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
-
- NodeQueueLoadMonitor topKSelector =
- new NodeQueueLoadMonitor(nodeSortInterval, comparator);
-
- float sigma = rmContext.getYarnConfiguration()
- .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
- YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
-
- int limitMin, limitMax;
-
- if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
- limitMin = rmContext.getYarnConfiguration()
- .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
- limitMax = rmContext.getYarnConfiguration()
- .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
- } else {
- limitMin = rmContext.getYarnConfiguration()
- .getInt(
- YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
- limitMax = rmContext.getYarnConfiguration()
- .getInt(
- YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
- YarnConfiguration.
- NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
- }
-
- topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
- this.nodeMonitor = topKSelector;
- }
-
- @Override
- public Server getServer(YarnRPC rpc, Configuration serverConf,
- InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
- Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
- addr, serverConf, secretManager,
- serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
- // To support application running on NMs that DO NOT support
- // Dist Scheduling... The server multiplexes both the
- // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
- ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- ApplicationMasterProtocolPB.class,
- ApplicationMasterProtocolService.newReflectiveBlockingService(
- new ApplicationMasterProtocolPBServiceImpl(this)));
- return server;
- }
-
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
- return super.registerApplicationMaster(request);
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster
- (FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- return super.finishApplicationMaster(request);
- }
-
- @Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
- return super.allocate(request);
- }
-
- @Override
- public RegisterDistributedSchedulingAMResponse
- registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
- RegisterApplicationMasterResponse response =
- registerApplicationMaster(request);
- RegisterDistributedSchedulingAMResponse dsResp = recordFactory
- .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
- dsResp.setRegisterResponse(response);
- dsResp.setMinContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB,
- YarnConfiguration.
- DIST_SCHEDULING_MIN_CONTAINER_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MIN_CONTAINER_VCORES_DEFAULT)
- )
- );
- dsResp.setMaxContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB,
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MAX_CONTAINER_VCORES_DEFAULT)
- )
- );
- dsResp.setIncrContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB,
- YarnConfiguration.
- DIST_SCHEDULING_INCR_CONTAINER_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES,
- YarnConfiguration.DIST_SCHEDULING_INCR_CONTAINER_VCORES_DEFAULT)
- )
- );
- dsResp.setContainerTokenExpiryInterval(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
- YarnConfiguration.
- DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
- dsResp.setContainerIdStart(
- this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
- // Set nodes to be used for scheduling
- dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
- return dsResp;
- }
-
- @Override
- public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
- DistributedSchedulingAllocateRequest request)
- throws YarnException, IOException {
- List distAllocContainers = request.getAllocatedContainers();
- for (Container container : distAllocContainers) {
- // Create RMContainer
- SchedulerApplicationAttempt appAttempt =
- ((AbstractYarnScheduler) rmContext.getScheduler())
- .getCurrentAttemptForContainer(container.getId());
- RMContainer rmContainer = new RMContainerImpl(container,
- appAttempt.getApplicationAttemptId(), container.getNodeId(),
- appAttempt.getUser(), rmContext, true);
- appAttempt.addRMContainer(container.getId(), rmContainer);
- rmContainer.handle(
- new RMContainerEvent(container.getId(),
- RMContainerEventType.LAUNCHED));
- }
- AllocateResponse response = allocate(request.getAllocateRequest());
- DistributedSchedulingAllocateResponse dsResp = recordFactory
- .newRecordInstance(DistributedSchedulingAllocateResponse.class);
- dsResp.setAllocateResponse(response);
- dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
- return dsResp;
- }
-
- private void addToMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
- if (rackName != null) {
- mapping.putIfAbsent(rackName, new HashSet());
- Set nodeIds = mapping.get(rackName);
- synchronized (nodeIds) {
- nodeIds.add(nodeId);
- }
- }
- }
-
- private void removeFromMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
- if (rackName != null) {
- Set nodeIds = mapping.get(rackName);
- synchronized (nodeIds) {
- nodeIds.remove(nodeId);
- }
- }
- }
-
- @Override
- public void handle(SchedulerEvent event) {
- switch (event.getType()) {
- case NODE_ADDED:
- if (!(event instanceof NodeAddedSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
- nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
- nodeAddedEvent.getAddedRMNode());
- addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
- nodeAddedEvent.getAddedRMNode().getNodeID());
- addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
- nodeAddedEvent.getAddedRMNode().getNodeID());
- break;
- case NODE_REMOVED:
- if (!(event instanceof NodeRemovedSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- NodeRemovedSchedulerEvent nodeRemovedEvent =
- (NodeRemovedSchedulerEvent) event;
- nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
- removeFromMapping(rackToNode,
- nodeRemovedEvent.getRemovedRMNode().getRackName(),
- nodeRemovedEvent.getRemovedRMNode().getNodeID());
- removeFromMapping(hostToNode,
- nodeRemovedEvent.getRemovedRMNode().getHostName(),
- nodeRemovedEvent.getRemovedRMNode().getNodeID());
- break;
- case NODE_UPDATE:
- if (!(event instanceof NodeUpdateSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
- event;
- nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
- break;
- case NODE_RESOURCE_UPDATE:
- if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
- throw new RuntimeException("Unexpected event type: " + event);
- }
- NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
- (NodeResourceUpdateSchedulerEvent) event;
- nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
- nodeResourceUpdatedEvent.getResourceOption());
- break;
-
- // <-- IGNORED EVENTS : START -->
- case APP_ADDED:
- break;
- case APP_REMOVED:
- break;
- case APP_ATTEMPT_ADDED:
- break;
- case APP_ATTEMPT_REMOVED:
- break;
- case CONTAINER_EXPIRED:
- break;
- case NODE_LABELS_UPDATE:
- break;
- // <-- IGNORED EVENTS : END -->
- default:
- LOG.error("Unknown event arrived at DistributedSchedulingAMService: "
- + event.toString());
- }
-
- }
-
- public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
- return nodeMonitor.getThresholdCalculator();
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java
new file mode 100644
index 0000000..5edef27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatingAMService.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The OpportunisticContainerAllocatingAMService is started instead of the
+ * ApplicationMasterService if distributed scheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class OpportunisticContainerAllocatingAMService
+ extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
+ EventHandler {
+
+ private static final Log LOG =
+ LogFactory.getLog(OpportunisticContainerAllocatingAMService.class);
+
+ private final NodeQueueLoadMonitor nodeMonitor;
+
+ private final ConcurrentHashMap> rackToNode =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> hostToNode =
+ new ConcurrentHashMap<>();
+ private final int k;
+
+ public OpportunisticContainerAllocatingAMService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(OpportunisticContainerAllocatingAMService.class.getName(),
+ rmContext, scheduler);
+ this.k = rmContext.getYarnConfiguration().getInt(
+ YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
+ YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
+ long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
+ YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+ NodeQueueLoadMonitor.LoadComparator comparator =
+ NodeQueueLoadMonitor.LoadComparator.valueOf(
+ rmContext.getYarnConfiguration().get(
+ YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
+
+ NodeQueueLoadMonitor topKSelector =
+ new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+
+ float sigma = rmContext.getYarnConfiguration()
+ .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
+ YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
+
+ int limitMin, limitMax;
+
+ if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+ limitMin = rmContext.getYarnConfiguration()
+ .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
+ limitMax = rmContext.getYarnConfiguration()
+ .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
+ } else {
+ limitMin = rmContext.getYarnConfiguration()
+ .getInt(
+ YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
+ limitMax = rmContext.getYarnConfiguration()
+ .getInt(
+ YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
+ YarnConfiguration.
+ NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
+ }
+
+ topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
+ this.nodeMonitor = topKSelector;
+ }
+
+ @Override
+ public Server getServer(YarnRPC rpc, Configuration serverConf,
+ InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+ if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
+ Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
+ addr, serverConf, secretManager,
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+ // To support application running on NMs that DO NOT support
+ // Dist Scheduling... The server multiplexes both the
+ // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+ ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ ApplicationMasterProtocolPB.class,
+ ApplicationMasterProtocolService.newReflectiveBlockingService(
+ new ApplicationMasterProtocolPBServiceImpl(this)));
+ return server;
+ }
+ return super.getServer(rpc, serverConf, addr, secretManager);
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ return super.allocate(request);
+ }
+
+ @Override
+ public RegisterDistributedSchedulingAMResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ RegisterApplicationMasterResponse response =
+ registerApplicationMaster(request);
+ RegisterDistributedSchedulingAMResponse dsResp = recordFactory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+ dsResp.setRegisterResponse(response);
+ dsResp.setMinContainerResource(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setMaxContainerResource(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
+ YarnConfiguration
+ .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setIncrContainerResource(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setContainerTokenExpiryInterval(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
+ dsResp.setContainerIdStart(
+ this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+ // Set nodes to be used for scheduling
+ dsResp.setNodesForScheduling(
+ this.nodeMonitor.selectLeastLoadedNodes(this.k));
+ return dsResp;
+ }
+
+ @Override
+ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
+ List distAllocContainers = request.getAllocatedContainers();
+ for (Container container : distAllocContainers) {
+ // Create RMContainer
+ SchedulerApplicationAttempt appAttempt =
+ ((AbstractYarnScheduler) rmContext.getScheduler())
+ .getCurrentAttemptForContainer(container.getId());
+ RMContainer rmContainer = new RMContainerImpl(container,
+ appAttempt.getApplicationAttemptId(), container.getNodeId(),
+ appAttempt.getUser(), rmContext, true);
+ appAttempt.addRMContainer(container.getId(), rmContainer);
+ rmContainer.handle(
+ new RMContainerEvent(container.getId(),
+ RMContainerEventType.LAUNCHED));
+ }
+ AllocateResponse response = allocate(request.getAllocateRequest());
+ DistributedSchedulingAllocateResponse dsResp = recordFactory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+ dsResp.setAllocateResponse(response);
+ dsResp.setNodesForScheduling(
+ this.nodeMonitor.selectLeastLoadedNodes(this.k));
+ return dsResp;
+ }
+
+ private void addToMapping(ConcurrentHashMap> mapping,
+ String rackName, NodeId nodeId) {
+ if (rackName != null) {
+ mapping.putIfAbsent(rackName, new HashSet());
+ Set nodeIds = mapping.get(rackName);
+ synchronized (nodeIds) {
+ nodeIds.add(nodeId);
+ }
+ }
+ }
+
+ private void removeFromMapping(ConcurrentHashMap> mapping,
+ String rackName, NodeId nodeId) {
+ if (rackName != null) {
+ Set nodeIds = mapping.get(rackName);
+ synchronized (nodeIds) {
+ nodeIds.remove(nodeId);
+ }
+ }
+ }
+
+ @Override
+ public void handle(SchedulerEvent event) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ if (!(event instanceof NodeAddedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
+ nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
+ addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+ nodeAddedEvent.getAddedRMNode().getNodeID());
+ addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+ nodeAddedEvent.getAddedRMNode().getNodeID());
+ break;
+ case NODE_REMOVED:
+ if (!(event instanceof NodeRemovedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeRemovedSchedulerEvent nodeRemovedEvent =
+ (NodeRemovedSchedulerEvent) event;
+ nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+ removeFromMapping(rackToNode,
+ nodeRemovedEvent.getRemovedRMNode().getRackName(),
+ nodeRemovedEvent.getRemovedRMNode().getNodeID());
+ removeFromMapping(hostToNode,
+ nodeRemovedEvent.getRemovedRMNode().getHostName(),
+ nodeRemovedEvent.getRemovedRMNode().getNodeID());
+ break;
+ case NODE_UPDATE:
+ if (!(event instanceof NodeUpdateSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
+ event;
+ nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+ break;
+ case NODE_RESOURCE_UPDATE:
+ if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+ (NodeResourceUpdateSchedulerEvent) event;
+ nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+ nodeResourceUpdatedEvent.getResourceOption());
+ break;
+
+ // <-- IGNORED EVENTS : START -->
+ case APP_ADDED:
+ break;
+ case APP_REMOVED:
+ break;
+ case APP_ATTEMPT_ADDED:
+ break;
+ case APP_ATTEMPT_REMOVED:
+ break;
+ case CONTAINER_EXPIRED:
+ break;
+ case NODE_LABELS_UPDATE:
+ break;
+ // <-- IGNORED EVENTS : END -->
+ default:
+ LOG.error("Unknown event arrived at" +
+ "OpportunisticContainerAllocatingAMService: " + event.toString());
+ }
+
+ }
+
+ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+ return nodeMonitor.getThresholdCalculator();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 4509045..5789051 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -116,7 +116,6 @@
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
@@ -1177,24 +1176,27 @@ protected ClientRMService createClientRMService() {
}
protected ApplicationMasterService createApplicationMasterService() {
- if (this.rmContext.getYarnConfiguration().getBoolean(
- YarnConfiguration.DIST_SCHEDULING_ENABLED,
- YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- DistributedSchedulingAMService distributedSchedulingService = new
- DistributedSchedulingAMService(this.rmContext, scheduler);
- EventDispatcher distSchedulerEventDispatcher =
- new EventDispatcher(distributedSchedulingService,
- DistributedSchedulingAMService.class.getName());
- // Add an event dispatcher for the DistributedSchedulingAMService
- // to handle node updates/additions and removals.
+ Configuration config = this.rmContext.getYarnConfiguration();
+ if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
+ || YarnConfiguration.isDistSchedulingEnabled(config)) {
+ OpportunisticContainerAllocatingAMService
+ oppContainerAllocatingAMService =
+ new OpportunisticContainerAllocatingAMService(this.rmContext,
+ scheduler);
+ EventDispatcher oppContainerAllocEventDispatcher =
+ new EventDispatcher(oppContainerAllocatingAMService,
+ OpportunisticContainerAllocatingAMService.class.getName());
+ // Add an event dispatcher for the
+ // OpportunisticContainerAllocatingAMService to handle node
+ // updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
- addService(distSchedulerEventDispatcher);
+ addService(oppContainerAllocEventDispatcher);
rmDispatcher.register(SchedulerEventType.class,
- distSchedulerEventDispatcher);
+ oppContainerAllocEventDispatcher);
this.rmContext.setContainerQueueLimitCalculator(
- distributedSchedulingService.getNodeManagerQueueLimitCalculator());
- return distributedSchedulingService;
+ oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
+ return oppContainerAllocatingAMService;
}
return new ApplicationMasterService(this.rmContext, scheduler);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 7d1b3c3..329c432 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -820,9 +820,10 @@ protected void serviceStop() {
@Override
protected ApplicationMasterService createApplicationMasterService() {
if (this.rmContext.getYarnConfiguration().getBoolean(
- YarnConfiguration.DIST_SCHEDULING_ENABLED,
- YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- return new DistributedSchedulingAMService(getRMContext(), scheduler) {
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT)) {
+ return new OpportunisticContainerAllocatingAMService(getRMContext(),
+ scheduler) {
@Override
protected void serviceStart() {
// override to not start rpc handler
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
deleted file mode 100644
index 0213a94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingAMService.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test cases for {@link DistributedSchedulingAMService}.
- */
-public class TestDistributedSchedulingAMService {
-
- // Test if the DistributedSchedulingAMService can handle both DSProtocol as
- // well as AMProtocol clients
- @Test
- public void testRPCWrapping() throws Exception {
- Configuration conf = new Configuration();
- conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
- .getName());
- YarnRPC rpc = YarnRPC.create(conf);
- String bindAddr = "localhost:0";
- InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
- final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
- final RMContext rmContext = new RMContextImpl() {
- @Override
- public AMLivelinessMonitor getAMLivelinessMonitor() {
- return null;
- }
-
- @Override
- public Configuration getYarnConfiguration() {
- return new YarnConfiguration();
- }
- };
- Container c = factory.newRecordInstance(Container.class);
- c.setExecutionType(ExecutionType.OPPORTUNISTIC);
- c.setId(
- ContainerId.newContainerId(
- ApplicationAttemptId.newInstance(
- ApplicationId.newInstance(12345, 1), 2), 3));
- AllocateRequest allReq =
- (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
- allReq.setAskList(Arrays.asList(
- ResourceRequest.newInstance(Priority.UNDEFINED, "a",
- Resource.newInstance(1, 2), 1, true, "exp",
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true))));
- DistributedSchedulingAMService service =
- createService(factory, rmContext, c);
- Server server = service.getServer(rpc, conf, addr, null);
- server.start();
-
- // Verify that the DistrubutedSchedulingService can handle vanilla
- // ApplicationMasterProtocol clients
- RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
- ProtobufRpcEngine.class);
- ApplicationMasterProtocolPB ampProxy =
- RPC.getProxy(ApplicationMasterProtocolPB
- .class, 1, NetUtils.getConnectAddress(server), conf);
- RegisterApplicationMasterResponse regResp =
- new RegisterApplicationMasterResponsePBImpl(
- ampProxy.registerApplicationMaster(null,
- ((RegisterApplicationMasterRequestPBImpl)factory
- .newRecordInstance(
- RegisterApplicationMasterRequest.class)).getProto()));
- Assert.assertEquals("dummyQueue", regResp.getQueue());
- FinishApplicationMasterResponse finishResp =
- new FinishApplicationMasterResponsePBImpl(
- ampProxy.finishApplicationMaster(null,
- ((FinishApplicationMasterRequestPBImpl)factory
- .newRecordInstance(
- FinishApplicationMasterRequest.class)).getProto()
- ));
- Assert.assertEquals(false, finishResp.getIsUnregistered());
- AllocateResponse allocResp =
- new AllocateResponsePBImpl(
- ampProxy.allocate(null,
- ((AllocateRequestPBImpl)factory
- .newRecordInstance(AllocateRequest.class)).getProto())
- );
- List allocatedContainers = allocResp.getAllocatedContainers();
- Assert.assertEquals(1, allocatedContainers.size());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- allocatedContainers.get(0).getExecutionType());
- Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
- // Verify that the DistrubutedSchedulingService can handle the
- // DistributedSchedulingAMProtocol clients as well
- RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
- ProtobufRpcEngine.class);
- DistributedSchedulingAMProtocolPB dsProxy =
- RPC.getProxy(DistributedSchedulingAMProtocolPB
- .class, 1, NetUtils.getConnectAddress(server), conf);
-
- RegisterDistributedSchedulingAMResponse dsRegResp =
- new RegisterDistributedSchedulingAMResponsePBImpl(
- dsProxy.registerApplicationMasterForDistributedScheduling(null,
- ((RegisterApplicationMasterRequestPBImpl)factory
- .newRecordInstance(RegisterApplicationMasterRequest.class))
- .getProto()));
- Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
- Assert.assertEquals(4,
- dsRegResp.getMaxContainerResource().getVirtualCores());
- Assert.assertEquals(1024,
- dsRegResp.getMinContainerResource().getMemorySize());
- Assert.assertEquals(2,
- dsRegResp.getIncrContainerResource().getVirtualCores());
-
- DistributedSchedulingAllocateRequestPBImpl distAllReq =
- (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
- DistributedSchedulingAllocateRequest.class);
- distAllReq.setAllocateRequest(allReq);
- distAllReq.setAllocatedContainers(Arrays.asList(c));
- DistributedSchedulingAllocateResponse dsAllocResp =
- new DistributedSchedulingAllocateResponsePBImpl(
- dsProxy.allocateForDistributedScheduling(null,
- distAllReq.getProto()));
- Assert.assertEquals(
- "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
-
- FinishApplicationMasterResponse dsfinishResp =
- new FinishApplicationMasterResponsePBImpl(
- dsProxy.finishApplicationMaster(null,
- ((FinishApplicationMasterRequestPBImpl) factory
- .newRecordInstance(FinishApplicationMasterRequest.class))
- .getProto()));
- Assert.assertEquals(
- false, dsfinishResp.getIsUnregistered());
- }
-
- private DistributedSchedulingAMService createService(final RecordFactory
- factory, final RMContext rmContext, final Container c) {
- return new DistributedSchedulingAMService(rmContext, null) {
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- RegisterApplicationMasterResponse resp = factory.newRecordInstance(
- RegisterApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setQueue("dummyQueue");
- return resp;
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster(
- FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- FinishApplicationMasterResponse resp = factory.newRecordInstance(
- FinishApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setIsUnregistered(false);
- return resp;
- }
-
- @Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
- AllocateResponse response = factory.newRecordInstance(
- AllocateResponse.class);
- response.setNumClusterNodes(12345);
- response.setAllocatedContainers(Arrays.asList(c));
- return response;
- }
-
- @Override
- public RegisterDistributedSchedulingAMResponse
- registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
- RegisterDistributedSchedulingAMResponse resp = factory
- .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
- resp.setContainerIdStart(54321L);
- resp.setMaxContainerResource(Resource.newInstance(4096, 4));
- resp.setMinContainerResource(Resource.newInstance(1024, 1));
- resp.setIncrContainerResource(Resource.newInstance(2048, 2));
- return resp;
- }
-
- @Override
- public DistributedSchedulingAllocateResponse
- allocateForDistributedScheduling(
- DistributedSchedulingAllocateRequest request)
- throws YarnException, IOException {
- List askList =
- request.getAllocateRequest().getAskList();
- List allocatedContainers = request.getAllocatedContainers();
- Assert.assertEquals(1, allocatedContainers.size());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- allocatedContainers.get(0).getExecutionType());
- Assert.assertEquals(1, askList.size());
- Assert.assertTrue(askList.get(0)
- .getExecutionTypeRequest().getEnforceExecutionType());
- DistributedSchedulingAllocateResponse resp = factory
- .newRecordInstance(DistributedSchedulingAllocateResponse.class);
- resp.setNodesForScheduling(
- Arrays.asList(NodeId.newInstance("h1", 1234)));
- return resp;
- }
- };
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java
new file mode 100644
index 0000000..f15bf75
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatingAMService.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test cases for {@link OpportunisticContainerAllocatingAMService}.
+ */
+public class TestOpportunisticContainerAllocatingAMService {
+
+ // Test if the OpportunisticContainerAllocatingAMService can handle both
+ // DSProtocol as well as AMProtocol clients
+ @Test
+ public void testRPCWrapping() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+ .getName());
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+ final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+ final RMContext rmContext = new RMContextImpl() {
+ @Override
+ public AMLivelinessMonitor getAMLivelinessMonitor() {
+ return null;
+ }
+
+ @Override
+ public Configuration getYarnConfiguration() {
+ return new YarnConfiguration();
+ }
+ };
+ Container c = factory.newRecordInstance(Container.class);
+ c.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ c.setId(
+ ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(12345, 1), 2), 3));
+ AllocateRequest allReq =
+ (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+ allReq.setAskList(Arrays.asList(
+ ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+ Resource.newInstance(1, 2), 1, true, "exp",
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))));
+ OpportunisticContainerAllocatingAMService service =
+ createService(factory, rmContext, c);
+ conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+ Server server = service.getServer(rpc, conf, addr, null);
+ server.start();
+
+ // Verify that the DistrubutedSchedulingService can handle vanilla
+ // ApplicationMasterProtocol clients
+ RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+ ProtobufRpcEngine.class);
+ ApplicationMasterProtocolPB ampProxy =
+ RPC.getProxy(ApplicationMasterProtocolPB
+ .class, 1, NetUtils.getConnectAddress(server), conf);
+ RegisterApplicationMasterResponse regResp =
+ new RegisterApplicationMasterResponsePBImpl(
+ ampProxy.registerApplicationMaster(null,
+ ((RegisterApplicationMasterRequestPBImpl)factory
+ .newRecordInstance(
+ RegisterApplicationMasterRequest.class)).getProto()));
+ Assert.assertEquals("dummyQueue", regResp.getQueue());
+ FinishApplicationMasterResponse finishResp =
+ new FinishApplicationMasterResponsePBImpl(
+ ampProxy.finishApplicationMaster(null,
+ ((FinishApplicationMasterRequestPBImpl)factory
+ .newRecordInstance(
+ FinishApplicationMasterRequest.class)).getProto()
+ ));
+ Assert.assertEquals(false, finishResp.getIsUnregistered());
+ AllocateResponse allocResp =
+ new AllocateResponsePBImpl(
+ ampProxy.allocate(null,
+ ((AllocateRequestPBImpl)factory
+ .newRecordInstance(AllocateRequest.class)).getProto())
+ );
+ List allocatedContainers = allocResp.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
+ Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+ // Verify that the DistrubutedSchedulingService can handle the
+ // DistributedSchedulingAMProtocol clients as well
+ RPC.setProtocolEngine(conf, DistributedSchedulingAMProtocolPB.class,
+ ProtobufRpcEngine.class);
+ DistributedSchedulingAMProtocolPB dsProxy =
+ RPC.getProxy(DistributedSchedulingAMProtocolPB
+ .class, 1, NetUtils.getConnectAddress(server), conf);
+
+ RegisterDistributedSchedulingAMResponse dsRegResp =
+ new RegisterDistributedSchedulingAMResponsePBImpl(
+ dsProxy.registerApplicationMasterForDistributedScheduling(null,
+ ((RegisterApplicationMasterRequestPBImpl)factory
+ .newRecordInstance(RegisterApplicationMasterRequest.class))
+ .getProto()));
+ Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+ Assert.assertEquals(4,
+ dsRegResp.getMaxContainerResource().getVirtualCores());
+ Assert.assertEquals(1024,
+ dsRegResp.getMinContainerResource().getMemorySize());
+ Assert.assertEquals(2,
+ dsRegResp.getIncrContainerResource().getVirtualCores());
+
+ DistributedSchedulingAllocateRequestPBImpl distAllReq =
+ (DistributedSchedulingAllocateRequestPBImpl)factory.newRecordInstance(
+ DistributedSchedulingAllocateRequest.class);
+ distAllReq.setAllocateRequest(allReq);
+ distAllReq.setAllocatedContainers(Arrays.asList(c));
+ DistributedSchedulingAllocateResponse dsAllocResp =
+ new DistributedSchedulingAllocateResponsePBImpl(
+ dsProxy.allocateForDistributedScheduling(null,
+ distAllReq.getProto()));
+ Assert.assertEquals(
+ "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+
+ FinishApplicationMasterResponse dsfinishResp =
+ new FinishApplicationMasterResponsePBImpl(
+ dsProxy.finishApplicationMaster(null,
+ ((FinishApplicationMasterRequestPBImpl) factory
+ .newRecordInstance(FinishApplicationMasterRequest.class))
+ .getProto()));
+ Assert.assertEquals(
+ false, dsfinishResp.getIsUnregistered());
+ }
+
+ private OpportunisticContainerAllocatingAMService createService(
+ final RecordFactory factory, final RMContext rmContext,
+ final Container c) {
+ return new OpportunisticContainerAllocatingAMService(rmContext, null) {
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws
+ YarnException, IOException {
+ RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+ RegisterApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setQueue("dummyQueue");
+ return resp;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ FinishApplicationMasterResponse resp = factory.newRecordInstance(
+ FinishApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setIsUnregistered(false);
+ return resp;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ AllocateResponse response = factory.newRecordInstance(
+ AllocateResponse.class);
+ response.setNumClusterNodes(12345);
+ response.setAllocatedContainers(Arrays.asList(c));
+ return response;
+ }
+
+ @Override
+ public RegisterDistributedSchedulingAMResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ RegisterDistributedSchedulingAMResponse resp = factory
+ .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
+ resp.setContainerIdStart(54321L);
+ resp.setMaxContainerResource(Resource.newInstance(4096, 4));
+ resp.setMinContainerResource(Resource.newInstance(1024, 1));
+ resp.setIncrContainerResource(Resource.newInstance(2048, 2));
+ return resp;
+ }
+
+ @Override
+ public DistributedSchedulingAllocateResponse
+ allocateForDistributedScheduling(
+ DistributedSchedulingAllocateRequest request)
+ throws YarnException, IOException {
+ List askList =
+ request.getAllocateRequest().getAskList();
+ List allocatedContainers = request.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ allocatedContainers.get(0).getExecutionType());
+ Assert.assertEquals(1, askList.size());
+ Assert.assertTrue(askList.get(0)
+ .getExecutionTypeRequest().getEnforceExecutionType());
+ DistributedSchedulingAllocateResponse resp = factory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+ resp.setNodesForScheduling(
+ Arrays.asList(NodeId.newInstance("h1", 1234)));
+ return resp;
+ }
+ };
+ }
+}