diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index ceb6f7e..928437f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -24,12 +24,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class CSAssignment {
+ public static final CSAssignment NULL_ASSIGNMENT =
+ new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
- final private Resource resource;
+ public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+
+ private Resource resource;
private NodeType type;
private RMContainer excessReservation;
private FiCaSchedulerApp application;
@@ -67,6 +72,10 @@ public CSAssignment(Resource resource, NodeType type,
public Resource getResource() {
return resource;
}
+
+ public void setResource(Resource resource) {
+ this.resource = resource;
+ }
public NodeType getType() {
return type;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
new file mode 100644
index 0000000..f4a3d29
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+public enum AllocationState {
+ APP_SKIPPED,
+ PRIORITY_SKIPPED,
+ QUEUE_SKIPPED,
+ SUCCEEDED,
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
new file mode 100644
index 0000000..578c6c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ContainerAllocation {
+ public static final ContainerAllocation PRIORITY_SKIPPED =
+ new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
+
+ public static final ContainerAllocation APP_SKIPPED =
+ new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
+
+ public static final ContainerAllocation QUEUE_SKIPPED =
+ new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
+
+ RMContainer containerToBeUnreserved;
+ private Resource resourceToBeAllocated = Resources.none();
+ AllocationState state;
+ boolean reserved = false;
+ NodeType containerNodeType = NodeType.NODE_LOCAL;
+ NodeType requestNodeType = NodeType.NODE_LOCAL;
+ Container updatedContainer;
+
+ public ContainerAllocation(RMContainer containerToBeUnreserved) {
+ this(containerToBeUnreserved, null, AllocationState.QUEUE_SKIPPED);
+ }
+
+ public ContainerAllocation(RMContainer containerToBeUnreserved,
+ Resource resourceToBeAllocated) {
+ this(containerToBeUnreserved, resourceToBeAllocated,
+ AllocationState.SUCCEEDED);
+ }
+
+ private ContainerAllocation(RMContainer containerToBeUnreserved,
+ Resource resourceToBeAllocated, AllocationState state) {
+ this.containerToBeUnreserved = containerToBeUnreserved;
+ this.resourceToBeAllocated = resourceToBeAllocated;
+ this.state = state;
+ }
+
+ public RMContainer getContainerToBeUnreserved() {
+ return containerToBeUnreserved;
+ }
+
+ public Resource getResourceToBeAllocated() {
+ if (resourceToBeAllocated == null) {
+ return Resources.none();
+ }
+ return resourceToBeAllocated;
+ }
+
+ public AllocationState getAllocationState() {
+ return state;
+ }
+
+ public NodeType getContainerNodeType() {
+ return containerNodeType;
+ }
+
+ public NodeType getRequestNodeType() {
+ return requestNodeType;
+ }
+
+ public boolean getIsReserved() {
+ return reserved;
+ }
+
+ public Container getUpdatedContainer() {
+ return updatedContainer;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
new file mode 100644
index 0000000..7caac6e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * For an application, resource limits and resource requests, decide how to
+ * allocate container. This is to make application resource allocation logic
+ * extensible.
+ */
+public abstract class ContainerAllocator {
+ FiCaSchedulerApp application;
+ final ResourceCalculator rc;
+ final RMContext rmContext;
+
+ public ContainerAllocator(FiCaSchedulerApp application,
+ ResourceCalculator rc, RMContext rmContext) {
+ this.application = application;
+ this.rc = rc;
+ this.rmContext = rmContext;
+ }
+
+ /**
+ * preAllocation is to perform checks, etc. to see if we can/cannot allocate
+ * container. It will put necessary information to returned
+ * {@link ContainerAllocation}.
+ */
+ abstract ContainerAllocation preAllocation(
+ Resource clusterResource, FiCaSchedulerNode node,
+ SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+ Priority priority, RMContainer reservedContainer);
+
+ /**
+ * doAllocation is to update application metrics, create containers, etc.
+ * According to allocating conclusion decided by preAllocation.
+ */
+ abstract ContainerAllocation doAllocation(
+ ContainerAllocation allocationResult, Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
+ RMContainer reservedContainer);
+
+ boolean checkHeadroom(Resource clusterResource,
+ ResourceLimits currentResourceLimits, Resource required,
+ FiCaSchedulerNode node) {
+ // If headroom + currentReservation < required, we cannot allocate this
+ // require
+ Resource resourceCouldBeUnReserved = application.getCurrentReservation();
+ if (!application.getCSLeafQueue().getReservationContinueLooking()
+ || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+ // If we don't allow reservation continuous looking, OR we're looking at
+ // non-default node partition, we won't allow to unreserve before
+ // allocation.
+ resourceCouldBeUnReserved = Resources.none();
+ }
+ return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
+ currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
+ required);
+ }
+
+ /**
+ * doAllocation needs to handle following stuffs:
+ *
+ *
+ * - Select request: Select a request to allocate. E.g. select a resource
+ * request based on requirement/priority/locality.
+ * - Check if a given resource can be allocated based on resource
+ * availability
+ * - Do allocation: this will decide/create allocated/reserved
+ * container, this will also update metrics
+ *
+ */
+ public ContainerAllocation allocate(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, Priority priority,
+ RMContainer reservedContainer) {
+ ContainerAllocation result =
+ preAllocation(clusterResource, node, schedulingMode,
+ resourceLimits, priority, reservedContainer);
+
+ if (AllocationState.SUCCEEDED == result.state) {
+ result = doAllocation(result, clusterResource, node,
+ schedulingMode, priority, reservedContainer);
+ }
+
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
new file mode 100644
index 0000000..1b96a46
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -0,0 +1,605 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Allocate normal (new) containers, considers locality/label, etc. Using
+ * delayed scheduling mechanism to get better locality allocation.
+ */
+public class RegularContainerAllocator extends ContainerAllocator {
+ private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
+
+ private ResourceRequest lastResourceRequest = null;
+
+ public RegularContainerAllocator(FiCaSchedulerApp application,
+ ResourceCalculator rc, RMContext rmContext) {
+ super(application, rc, rmContext);
+ }
+
+ private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, Priority priority) {
+ if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
+ return ContainerAllocation.APP_SKIPPED;
+ }
+
+ ResourceRequest anyRequest =
+ application.getResourceRequest(priority, ResourceRequest.ANY);
+ if (null == anyRequest) {
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+
+ // Required resource
+ Resource required = anyRequest.getCapability();
+
+ // Do we need containers at this 'priority'?
+ if (application.getTotalRequiredResources(priority) <= 0) {
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+
+ // AM container allocation doesn't support non-exclusive allocation to
+ // avoid painful of preempt an AM container
+ if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ RMAppAttempt rmAppAttempt =
+ rmContext.getRMApps().get(application.getApplicationId())
+ .getCurrentAppAttempt();
+ if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
+ && null == rmAppAttempt.getMasterContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip allocating AM container to app_attempt="
+ + application.getApplicationAttemptId()
+ + ", don't allow to allocate AM container in non-exclusive mode");
+ }
+ return ContainerAllocation.APP_SKIPPED;
+ }
+ }
+
+ // Is the node-label-expression of this offswitch resource request
+ // matches the node's label?
+ // If not match, jump to next priority.
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest,
+ node.getPartition(), schedulingMode)) {
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+
+ if (!application.getCSLeafQueue().getReservationContinueLooking()) {
+ if (!shouldAllocOrReserveNewContainer(priority, required)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("doesn't need containers based on reservation algo!");
+ }
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+ }
+
+ if (!checkHeadroom(clusterResource, resourceLimits, required, node)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cannot allocate required resource=" + required
+ + " because of headroom");
+ }
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ // Inform the application it is about to get a scheduling opportunity
+ application.addSchedulingOpportunity(priority);
+
+ // Increase missed-non-partitioned-resource-request-opportunity.
+ // This is to make sure non-partitioned-resource-request will prefer
+ // to be allocated to non-partitioned nodes
+ int missedNonPartitionedRequestSchedulingOpportunity = 0;
+ if (anyRequest.getNodeLabelExpression()
+ .equals(RMNodeLabelsManager.NO_LABEL)) {
+ missedNonPartitionedRequestSchedulingOpportunity =
+ application
+ .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+ }
+
+ if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ // Before doing allocation, we need to check scheduling opportunity to
+ // make sure : non-partitioned resource request should be scheduled to
+ // non-partitioned partition first.
+ if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
+ .getScheduler().getNumClusterNodes()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ + " priority=" + priority
+ + " because missed-non-partitioned-resource-request"
+ + " opportunity under requred:" + " Now="
+ + missedNonPartitionedRequestSchedulingOpportunity + " required="
+ + rmContext.getScheduler().getNumClusterNodes());
+ }
+
+ return ContainerAllocation.APP_SKIPPED;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ ContainerAllocation preAllocation(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, Priority priority,
+ RMContainer reservedContainer) {
+ ContainerAllocation result;
+ if (null == reservedContainer) {
+ // pre-check when allocating new container
+ result =
+ preCheckForNewContainer(clusterResource, node, schedulingMode,
+ resourceLimits, priority);
+ if (null != result) {
+ return result;
+ }
+ } else {
+ // pre-check when allocating reserved container
+ if (application.getTotalRequiredResources(priority) == 0) {
+ // Release
+ return new ContainerAllocation(reservedContainer);
+ }
+ }
+
+ // Try to allocate containers on node
+ result =
+ assignContainersOnNode(clusterResource, node, priority,
+ reservedContainer, schedulingMode, resourceLimits);
+
+ if (null == reservedContainer) {
+ if (result.state == AllocationState.PRIORITY_SKIPPED) {
+ // Don't count 'skipped nodes' as a scheduling opportunity!
+ application.subtractSchedulingOpportunity(priority);
+ }
+ }
+
+ return result;
+ }
+
+ public synchronized float getLocalityWaitFactor(
+ Priority priority, int clusterNodes) {
+ // Estimate: Required unique resources (i.e. hosts + racks)
+ int requiredResources =
+ Math.max(application.getResourceRequests(priority).size() - 1, 0);
+
+ // waitFactor can't be more than '1'
+ // i.e. no point skipping more than clustersize opportunities
+ return Math.min(((float)requiredResources / clusterNodes), 1.0f);
+ }
+
+ private int getActualNodeLocalityDelay() {
+ return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
+ .getCSLeafQueue().getNodeLocalityDelay());
+ }
+
+ private boolean canAssign(Priority priority, FiCaSchedulerNode node,
+ NodeType type, RMContainer reservedContainer) {
+
+ // Clearly we need containers for this application...
+ if (type == NodeType.OFF_SWITCH) {
+ if (reservedContainer != null) {
+ return true;
+ }
+
+ // 'Delay' off-switch
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, ResourceRequest.ANY);
+ long missedOpportunities = application.getSchedulingOpportunities(priority);
+ long requiredContainers = offSwitchRequest.getNumContainers();
+
+ float localityWaitFactor =
+ getLocalityWaitFactor(priority, rmContext.getScheduler()
+ .getNumClusterNodes());
+
+ return ((requiredContainers * localityWaitFactor) < missedOpportunities);
+ }
+
+ // Check if we need containers on this rack
+ ResourceRequest rackLocalRequest =
+ application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+ return false;
+ }
+
+ // If we are here, we do need containers on this rack for RACK_LOCAL req
+ if (type == NodeType.RACK_LOCAL) {
+ // 'Delay' rack-local just a little bit...
+ long missedOpportunities = application.getSchedulingOpportunities(priority);
+ return getActualNodeLocalityDelay() < missedOpportunities;
+ }
+
+ // Check if we need containers on this host
+ if (type == NodeType.NODE_LOCAL) {
+ // Now check if we need containers on this host...
+ ResourceRequest nodeLocalRequest =
+ application.getResourceRequest(priority, node.getNodeName());
+ if (nodeLocalRequest != null) {
+ return nodeLocalRequest.getNumContainers() > 0;
+ }
+ }
+
+ return false;
+ }
+
+ private ContainerAllocation assignNodeLocalContainers(
+ Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
+ FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+ if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
+ return assignContainer(clusterResource, node, priority,
+ nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
+ schedulingMode, currentResoureLimits);
+ }
+
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ private ContainerAllocation assignRackLocalContainers(
+ Resource clusterResource, ResourceRequest rackLocalResourceRequest,
+ FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+ if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
+ return assignContainer(clusterResource, node, priority,
+ rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
+ schedulingMode, currentResoureLimits);
+ }
+
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ private ContainerAllocation assignOffSwitchContainers(
+ Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+ FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+ if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
+ return assignContainer(clusterResource, node, priority,
+ offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
+ schedulingMode, currentResoureLimits);
+ }
+
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ private ContainerAllocation assignContainersOnNode(Resource clusterResource,
+ FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+
+ ContainerAllocation assigned;
+
+ NodeType requestType = null;
+ // Data-local
+ ResourceRequest nodeLocalResourceRequest =
+ application.getResourceRequest(priority, node.getNodeName());
+ if (nodeLocalResourceRequest != null) {
+ requestType = NodeType.NODE_LOCAL;
+ assigned =
+ assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
+ node, priority, reservedContainer, schedulingMode,
+ currentResoureLimits);
+ if (Resources.greaterThan(rc, clusterResource,
+ assigned.getResourceToBeAllocated(), Resources.none())) {
+ assigned.requestNodeType = requestType;
+ return assigned;
+ }
+ }
+
+ // Rack-local
+ ResourceRequest rackLocalResourceRequest =
+ application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalResourceRequest != null) {
+ if (!rackLocalResourceRequest.getRelaxLocality()) {
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+
+ if (requestType != NodeType.NODE_LOCAL) {
+ requestType = NodeType.RACK_LOCAL;
+ }
+
+ assigned =
+ assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
+ node, priority, reservedContainer, schedulingMode,
+ currentResoureLimits);
+ if (Resources.greaterThan(rc, clusterResource,
+ assigned.getResourceToBeAllocated(), Resources.none())) {
+ assigned.requestNodeType = requestType;
+ return assigned;
+ }
+ }
+
+ // Off-switch
+ ResourceRequest offSwitchResourceRequest =
+ application.getResourceRequest(priority, ResourceRequest.ANY);
+ if (offSwitchResourceRequest != null) {
+ if (!offSwitchResourceRequest.getRelaxLocality()) {
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+ if (requestType != NodeType.NODE_LOCAL
+ && requestType != NodeType.RACK_LOCAL) {
+ requestType = NodeType.OFF_SWITCH;
+ }
+
+ assigned =
+ assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+ node, priority, reservedContainer, schedulingMode,
+ currentResoureLimits);
+ assigned.requestNodeType = requestType;
+
+ return assigned;
+ }
+
+ return ContainerAllocation.PRIORITY_SKIPPED;
+ }
+
+ private ContainerAllocation assignContainer(Resource clusterResource,
+ FiCaSchedulerNode node, Priority priority, ResourceRequest request,
+ NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
+ ResourceLimits currentResoureLimits) {
+ lastResourceRequest = request;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("assignContainers: node=" + node.getNodeName()
+ + " application=" + application.getApplicationId()
+ + " priority=" + priority.getPriority()
+ + " request=" + request + " type=" + type);
+ }
+
+ // check if the resource request can access the label
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+ node.getPartition(), schedulingMode)) {
+ // this is a reserved container, but we cannot allocate it now according
+ // to label not match. This can be caused by node label changed
+ // We should un-reserve this container.
+ return new ContainerAllocation(rmContainer);
+ }
+
+ Resource capability = request.getCapability();
+ Resource available = node.getAvailableResource();
+ Resource totalResource = node.getTotalResource();
+
+ if (!Resources.lessThanOrEqual(rc, clusterResource,
+ capability, totalResource)) {
+ LOG.warn("Node : " + node.getNodeID()
+ + " does not have sufficient resource for request : " + request
+ + " node total capability : " + node.getTotalResource());
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ assert Resources.greaterThan(
+ rc, clusterResource, available, Resources.none());
+
+ boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+ priority, capability);
+
+ // Can we allocate a container on this node?
+ int availableContainers =
+ rc.computeAvailableContainers(available, capability);
+
+ // How much need to unreserve equals to:
+ // max(required - headroom, amountNeedUnreserve)
+ Resource resourceNeedToUnReserve =
+ Resources.max(rc, clusterResource,
+ Resources.subtract(capability, currentResoureLimits.getHeadroom()),
+ currentResoureLimits.getAmountNeededUnreserve());
+
+ boolean needToUnreserve =
+ Resources.greaterThan(rc, clusterResource,
+ resourceNeedToUnReserve, Resources.none());
+
+ RMContainer unreservedContainer = null;
+ boolean reservationsContinueLooking =
+ application.getCSLeafQueue().getReservationContinueLooking();
+
+ if (availableContainers > 0) {
+ // Allocate...
+ // We will only do continuous reservation when this is not allocated from
+ // reserved container
+ if (rmContainer == null && reservationsContinueLooking
+ && node.getLabels().isEmpty()) {
+ // when reservationsContinueLooking is set, we may need to unreserve
+ // some containers to meet this queue, its parents', or the users'
+ // resource limits.
+ // TODO, need change here when we want to support continuous reservation
+ // looking for labeled partitions.
+ if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+ if (!needToUnreserve) {
+ // If we shouldn't allocate/reserve new container then we should
+ // unreserve one the same size we are asking for since the
+ // currentResoureLimits.getAmountNeededUnreserve could be zero. If
+ // the limit was hit then use the amount we need to unreserve to be
+ // under the limit.
+ resourceNeedToUnReserve = capability;
+ }
+ unreservedContainer =
+ application.findNodeToUnreserve(clusterResource, node, priority,
+ resourceNeedToUnReserve);
+ // When (minimum-unreserved-resource > 0 OR we cannot allocate
+ // new/reserved
+ // container (That means we *have to* unreserve some resource to
+ // continue)). If we failed to unreserve some resource, we can't
+ // continue.
+ if (null == unreservedContainer) {
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+ }
+ }
+
+ ContainerAllocation result =
+ new ContainerAllocation(unreservedContainer, request.getCapability());
+ result.reserved = false;
+ result.containerNodeType = type;
+ return result;
+ } else {
+ // if we are allowed to allocate but this node doesn't have space, reserve it or
+ // if this was an already a reserved container, reserve it again
+ if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+ if (reservationsContinueLooking && rmContainer == null) {
+ // we could possibly ignoring queue capacity or user limits when
+ // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+ // one.
+ if (needToUnreserve) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("we needed to unreserve to be able to allocate");
+ }
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+ }
+
+ ContainerAllocation result =
+ new ContainerAllocation(null, request.getCapability());
+ result.reserved = true;
+ result.containerNodeType = type;
+ return result;
+ }
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+ }
+
+ boolean
+ shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
+ int requiredContainers = application.getTotalRequiredResources(priority);
+ int reservedContainers = application.getNumReservedContainers(priority);
+ int starvation = 0;
+ if (reservedContainers > 0) {
+ float nodeFactor =
+ Resources
+ .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
+
+ // Use percentage of node required to bias against large containers...
+ // Protect against corner case where you need the whole node with
+ // Math.min(nodeFactor, minimumAllocationFactor)
+ starvation =
+ (int) ((application.getReReservations(priority) /
+ (float) reservedContainers) * (1.0f - (Math.min(
+ nodeFactor, application.getCSLeafQueue()
+ .getMinimumAllocationFactor()))));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("needsContainers:" + " app.#re-reserve="
+ + application.getReReservations(priority) + " reserved="
+ + reservedContainers + " nodeFactor=" + nodeFactor
+ + " minAllocFactor="
+ + application.getCSLeafQueue().getMinimumAllocationFactor()
+ + " starvation=" + starvation);
+ }
+ }
+ return (((starvation + requiredContainers) - reservedContainers) > 0);
+ }
+
+ private Container getContainer(RMContainer rmContainer,
+ FiCaSchedulerNode node, Resource capability, Priority priority) {
+ return (rmContainer != null) ? rmContainer.getContainer()
+ : createContainer(node, capability, priority);
+ }
+
+ private Container createContainer(FiCaSchedulerNode node, Resource capability,
+ Priority priority) {
+
+ NodeId nodeId = node.getRMNode().getNodeID();
+ ContainerId containerId =
+ BuilderUtils.newContainerId(application.getApplicationAttemptId(),
+ application.getNewContainerId());
+
+ // Create the container
+ return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
+ .getHttpAddress(), capability, priority, null);
+ }
+
+ private ContainerAllocation handleNewContainerAllocation(
+ ContainerAllocation allocationResult, Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
+ RMContainer reservedContainer, Container container) {
+ // Handling container allocation
+ // Did we previously reserve containers at this 'priority'?
+ if (reservedContainer != null) {
+ application.unreserve(priority, node, reservedContainer);
+ }
+
+ // Inform the application
+ RMContainer allocatedContainer =
+ application.allocate(allocationResult.containerNodeType, node,
+ priority, lastResourceRequest, container);
+
+ // Does the application need this resource?
+ if (allocatedContainer == null) {
+ // Skip this app if we failed to allocate.
+ ContainerAllocation ret =
+ new ContainerAllocation(allocationResult.containerToBeUnreserved);
+ ret.state = AllocationState.APP_SKIPPED;
+ return ret;
+ }
+
+ // Inform the node
+ node.allocateContainer(allocatedContainer);
+
+ // update locality statistics
+ application.incNumAllocatedContainers(allocationResult.containerNodeType,
+ allocationResult.requestNodeType);
+
+ return allocationResult;
+ }
+
+ @Override
+ ContainerAllocation doAllocation(ContainerAllocation allocationResult,
+ Resource clusterResource, FiCaSchedulerNode node,
+ SchedulingMode schedulingMode, Priority priority,
+ RMContainer reservedContainer) {
+ // Create the container if necessary
+ Container container =
+ getContainer(reservedContainer, node,
+ allocationResult.getResourceToBeAllocated(), priority);
+
+ // something went wrong getting/creating the container
+ if (container == null) {
+ LOG.warn("Couldn't get container for allocation!");
+ return ContainerAllocation.QUEUE_SKIPPED;
+ }
+
+ if (!allocationResult.reserved) {
+ // When allocating container
+ allocationResult =
+ handleNewContainerAllocation(allocationResult, clusterResource, node,
+ schedulingMode, priority, reservedContainer, container);
+ } else {
+ // When reserving container
+ application.reserve(priority, node, reservedContainer, container);
+ }
+ allocationResult.updatedContainer = container;
+
+ if (allocationResult.state == AllocationState.SUCCEEDED) {
+ // Don't reset scheduling opportunities for offswitch assignments
+ // otherwise the app will be delayed for each non-local assignment.
+ // This helps apps with many off-cluster requests schedule faster.
+ if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Resetting scheduling opportunities");
+ }
+ application.resetSchedulingOpportunities(priority);
+ }
+
+ // Non-exclusive scheduling opportunity is different: we need reset
+ // it every time to make sure non-labeled resource request will be
+ // most likely allocated on non-labeled nodes first.
+ application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
+ }
+
+ return allocationResult;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index c660fcb..f4854c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,9 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -54,15 +51,16 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -78,11 +76,6 @@
public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
- static final CSAssignment NULL_ASSIGNMENT =
- new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-
- static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-
private final Set containersToPreempt =
new HashSet();
@@ -91,6 +84,8 @@
private ResourceCalculator rc = new DefaultResourceCalculator();
private ResourceScheduler scheduler;
+
+ private ContainerAllocator applicationContainerAllocator;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -124,6 +119,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
+
+ applicationContainerAllocator = new RegularContainerAllocator(this, rc, rmContext);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -386,223 +383,6 @@ public synchronized void transferStateFromPreviousAttempt(
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
}
- private int getActualNodeLocalityDelay() {
- return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue()
- .getNodeLocalityDelay());
- }
-
- private boolean canAssign(Priority priority, FiCaSchedulerNode node,
- NodeType type, RMContainer reservedContainer) {
-
- // Clearly we need containers for this application...
- if (type == NodeType.OFF_SWITCH) {
- if (reservedContainer != null) {
- return true;
- }
-
- // 'Delay' off-switch
- ResourceRequest offSwitchRequest =
- getResourceRequest(priority, ResourceRequest.ANY);
- long missedOpportunities = getSchedulingOpportunities(priority);
- long requiredContainers = offSwitchRequest.getNumContainers();
-
- float localityWaitFactor =
- getLocalityWaitFactor(priority, scheduler.getNumClusterNodes());
-
- return ((requiredContainers * localityWaitFactor) < missedOpportunities);
- }
-
- // Check if we need containers on this rack
- ResourceRequest rackLocalRequest =
- getResourceRequest(priority, node.getRackName());
- if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
- return false;
- }
-
- // If we are here, we do need containers on this rack for RACK_LOCAL req
- if (type == NodeType.RACK_LOCAL) {
- // 'Delay' rack-local just a little bit...
- long missedOpportunities = getSchedulingOpportunities(priority);
- return getActualNodeLocalityDelay() < missedOpportunities;
- }
-
- // Check if we need containers on this host
- if (type == NodeType.NODE_LOCAL) {
- // Now check if we need containers on this host...
- ResourceRequest nodeLocalRequest =
- getResourceRequest(priority, node.getNodeName());
- if (nodeLocalRequest != null) {
- return nodeLocalRequest.getNumContainers() > 0;
- }
- }
-
- return false;
- }
-
- boolean
- shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
- int requiredContainers = getTotalRequiredResources(priority);
- int reservedContainers = getNumReservedContainers(priority);
- int starvation = 0;
- if (reservedContainers > 0) {
- float nodeFactor =
- Resources.ratio(
- rc, required, getCSLeafQueue().getMaximumAllocation()
- );
-
- // Use percentage of node required to bias against large containers...
- // Protect against corner case where you need the whole node with
- // Math.min(nodeFactor, minimumAllocationFactor)
- starvation =
- (int)((getReReservations(priority) / (float)reservedContainers) *
- (1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor())))
- );
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("needsContainers:" +
- " app.#re-reserve=" + getReReservations(priority) +
- " reserved=" + reservedContainers +
- " nodeFactor=" + nodeFactor +
- " minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() +
- " starvation=" + starvation);
- }
- }
- return (((starvation + requiredContainers) - reservedContainers) > 0);
- }
-
- private CSAssignment assignNodeLocalContainers(Resource clusterResource,
- ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
- Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(priority, node, NodeType.NODE_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, priority,
- nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
- }
-
- private CSAssignment assignRackLocalContainers(Resource clusterResource,
- ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
- Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(priority, node, NodeType.RACK_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, priority,
- rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
- }
-
- private CSAssignment assignOffSwitchContainers(Resource clusterResource,
- ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
- Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(priority, node, NodeType.OFF_SWITCH,
- reservedContainer)) {
- return assignContainer(clusterResource, node, priority,
- offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
- }
-
- private CSAssignment assignContainersOnNode(Resource clusterResource,
- FiCaSchedulerNode node, Priority priority,
- RMContainer reservedContainer, SchedulingMode schedulingMode,
- ResourceLimits currentResoureLimits) {
-
- CSAssignment assigned;
-
- NodeType requestType = null;
- MutableObject allocatedContainer = new MutableObject();
- // Data-local
- ResourceRequest nodeLocalResourceRequest =
- getResourceRequest(priority, node.getNodeName());
- if (nodeLocalResourceRequest != null) {
- requestType = NodeType.NODE_LOCAL;
- assigned =
- assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
- node, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- if (Resources.greaterThan(rc, clusterResource,
- assigned.getResource(), Resources.none())) {
-
- //update locality statistics
- if (allocatedContainer.getValue() != null) {
- incNumAllocatedContainers(NodeType.NODE_LOCAL,
- requestType);
- }
- assigned.setType(NodeType.NODE_LOCAL);
- return assigned;
- }
- }
-
- // Rack-local
- ResourceRequest rackLocalResourceRequest =
- getResourceRequest(priority, node.getRackName());
- if (rackLocalResourceRequest != null) {
- if (!rackLocalResourceRequest.getRelaxLocality()) {
- return SKIP_ASSIGNMENT;
- }
-
- if (requestType != NodeType.NODE_LOCAL) {
- requestType = NodeType.RACK_LOCAL;
- }
-
- assigned =
- assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
- node, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- if (Resources.greaterThan(rc, clusterResource,
- assigned.getResource(), Resources.none())) {
-
- //update locality statistics
- if (allocatedContainer.getValue() != null) {
- incNumAllocatedContainers(NodeType.RACK_LOCAL,
- requestType);
- }
- assigned.setType(NodeType.RACK_LOCAL);
- return assigned;
- }
- }
-
- // Off-switch
- ResourceRequest offSwitchResourceRequest =
- getResourceRequest(priority, ResourceRequest.ANY);
- if (offSwitchResourceRequest != null) {
- if (!offSwitchResourceRequest.getRelaxLocality()) {
- return SKIP_ASSIGNMENT;
- }
- if (requestType != NodeType.NODE_LOCAL
- && requestType != NodeType.RACK_LOCAL) {
- requestType = NodeType.OFF_SWITCH;
- }
-
- assigned =
- assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
- node, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
-
- // update locality statistics
- if (allocatedContainer.getValue() != null) {
- incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
- }
- assigned.setType(NodeType.OFF_SWITCH);
- return assigned;
- }
-
- return SKIP_ASSIGNMENT;
- }
-
public void reserve(Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
// Update reserved metrics if this is the first reservation
@@ -618,25 +398,6 @@ public void reserve(Priority priority,
node.reserveResource(this, priority, rmContainer);
}
- private Container getContainer(RMContainer rmContainer,
- FiCaSchedulerNode node, Resource capability, Priority priority) {
- return (rmContainer != null) ? rmContainer.getContainer()
- : createContainer(node, capability, priority);
- }
-
- Container createContainer(FiCaSchedulerNode node, Resource capability,
- Priority priority) {
-
- NodeId nodeId = node.getRMNode().getNodeID();
- ContainerId containerId =
- BuilderUtils.newContainerId(getApplicationAttemptId(),
- getNewContainerId());
-
- // Create the container
- return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
- .getHttpAddress(), capability, priority, null);
- }
-
@VisibleForTesting
public RMContainer findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, Priority priority,
@@ -672,203 +433,63 @@ public RMContainer findNodeToUnreserve(Resource clusterResource,
return nodeToUnreserve.getReservedContainer();
}
- private LeafQueue getCSLeafQueue() {
+ public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
- private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
- Priority priority,
- ResourceRequest request, NodeType type, RMContainer rmContainer,
- MutableObject createdContainer, SchedulingMode schedulingMode,
- ResourceLimits currentResoureLimits) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("assignContainers: node=" + node.getNodeName()
- + " application=" + getApplicationId()
- + " priority=" + priority.getPriority()
- + " request=" + request + " type=" + type);
- }
-
- // check if the resource request can access the label
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
- node.getPartition(), schedulingMode)) {
- // this is a reserved container, but we cannot allocate it now according
- // to label not match. This can be caused by node label changed
- // We should un-reserve this container.
- if (rmContainer != null) {
- unreserve(priority, node, rmContainer);
- }
- return new CSAssignment(Resources.none(), type);
- }
-
- Resource capability = request.getCapability();
- Resource available = node.getAvailableResource();
- Resource totalResource = node.getTotalResource();
-
- if (!Resources.lessThanOrEqual(rc, clusterResource,
- capability, totalResource)) {
- LOG.warn("Node : " + node.getNodeID()
- + " does not have sufficient resource for request : " + request
- + " node total capability : " + node.getTotalResource());
- return new CSAssignment(Resources.none(), type);
- }
-
- assert Resources.greaterThan(
- rc, clusterResource, available, Resources.none());
-
- // Create the container if necessary
- Container container =
- getContainer(rmContainer, node, capability, priority);
-
- // something went wrong getting/creating the container
- if (container == null) {
- LOG.warn("Couldn't get container for allocation!");
- return new CSAssignment(Resources.none(), type);
- }
-
- boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
- priority, capability);
-
- // Can we allocate a container on this node?
- int availableContainers =
- rc.computeAvailableContainers(available, capability);
-
- // How much need to unreserve equals to:
- // max(required - headroom, amountNeedUnreserve)
- Resource resourceNeedToUnReserve =
- Resources.max(rc, clusterResource,
- Resources.subtract(capability, currentResoureLimits.getHeadroom()),
- currentResoureLimits.getAmountNeededUnreserve());
-
- boolean needToUnreserve =
- Resources.greaterThan(rc, clusterResource,
- resourceNeedToUnReserve, Resources.none());
-
- RMContainer unreservedContainer = null;
- boolean reservationsContinueLooking =
- getCSLeafQueue().getReservationContinueLooking();
-
- if (availableContainers > 0) {
- // Allocate...
-
- // Did we previously reserve containers at this 'priority'?
- if (rmContainer != null) {
- unreserve(priority, node, rmContainer);
- } else if (reservationsContinueLooking && node.getLabels().isEmpty()) {
- // when reservationsContinueLooking is set, we may need to unreserve
- // some containers to meet this queue, its parents', or the users' resource limits.
- // TODO, need change here when we want to support continuous reservation
- // looking for labeled partitions.
- if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
- if (!needToUnreserve) {
- // If we shouldn't allocate/reserve new container then we should
- // unreserve one the same size we are asking for since the
- // currentResoureLimits.getAmountNeededUnreserve could be zero. If
- // the limit was hit then use the amount we need to unreserve to be
- // under the limit.
- resourceNeedToUnReserve = capability;
- }
- unreservedContainer =
- findNodeToUnreserve(clusterResource, node, priority,
- resourceNeedToUnReserve);
- // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
- // container (That means we *have to* unreserve some resource to
- // continue)). If we failed to unreserve some resource, we can't continue.
- if (null == unreservedContainer) {
- return new CSAssignment(Resources.none(), type);
- }
- }
- }
-
- // Inform the application
- RMContainer allocatedContainer =
- allocate(type, node, priority, request, container);
-
- // Does the application need this resource?
- if (allocatedContainer == null) {
- CSAssignment csAssignment = new CSAssignment(Resources.none(), type);
- csAssignment.setApplication(this);
- csAssignment.setExcessReservation(unreservedContainer);
- return csAssignment;
- }
-
- // Inform the node
- node.allocateContainer(allocatedContainer);
-
- // Inform the ordering policy
- getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
- allocatedContainer);
-
- LOG.info("assignedContainer" +
- " application attempt=" + getApplicationAttemptId() +
- " container=" + container +
- " queue=" + this +
- " clusterResource=" + clusterResource);
- createdContainer.setValue(allocatedContainer);
- CSAssignment assignment = new CSAssignment(container.getResource(), type);
- assignment.getAssignmentInformation().addAllocationDetails(
- container.getId(), getCSLeafQueue().getQueuePath());
- assignment.getAssignmentInformation().incrAllocations();
- assignment.setApplication(this);
- Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- container.getResource());
-
- assignment.setExcessReservation(unreservedContainer);
- return assignment;
- } else {
- // if we are allowed to allocate but this node doesn't have space, reserve it or
- // if this was an already a reserved container, reserve it again
- if (shouldAllocOrReserveNewContainer || rmContainer != null) {
-
- if (reservationsContinueLooking && rmContainer == null) {
- // we could possibly ignoring queue capacity or user limits when
- // reservationsContinueLooking is set. Make sure we didn't need to unreserve
- // one.
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate");
- }
- return new CSAssignment(Resources.none(), type);
- }
- }
-
- // Reserve by 'charging' in advance...
- reserve(priority, node, rmContainer, container);
-
- LOG.info("Reserved container " +
- " application=" + getApplicationId() +
- " resource=" + request.getCapability() +
- " queue=" + this.toString() +
- " cluster=" + clusterResource);
- CSAssignment assignment =
- new CSAssignment(request.getCapability(), type);
+ private CSAssignment getCSAssignmentFromAllocateResult(
+ Resource clusterResource, ContainerAllocation result) {
+ // Handle skipped
+ boolean skipped =
+ (result.getAllocationState() == AllocationState.APP_SKIPPED);
+ CSAssignment assignment = new CSAssignment(skipped);
+ assignment.setApplication(this);
+
+ // Handle excess reservation
+ assignment.setExcessReservation(result.getContainerToBeUnreserved());
+
+ // If we allocated something
+ if (Resources.greaterThan(rc, clusterResource,
+ result.getResourceToBeAllocated(), Resources.none())) {
+ Resource allocatedResource = result.getResourceToBeAllocated();
+ Container updatedContainer = result.getUpdatedContainer();
+
+ assignment.setResource(allocatedResource);
+ assignment.setType(result.getContainerNodeType());
+
+ if (result.getIsReserved()) {
+ // This is a reserved container
+ LOG.info("Reserved container " + " application=" + getApplicationId()
+ + " resource=" + allocatedResource + " queue="
+ + this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
- container.getId(), getCSLeafQueue().getQueuePath());
+ updatedContainer.getId(), getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
- request.getCapability());
- return assignment;
+ allocatedResource);
+ assignment.setFulfilledReservation(true);
+ } else {
+ // This is a new container
+ // Inform the ordering policy
+ LOG.info("assignedContainer" + " application attempt="
+ + getApplicationAttemptId() + " container="
+ + updatedContainer.getId() + " queue=" + this + " clusterResource="
+ + clusterResource);
+
+ getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
+ getRMContainer(updatedContainer.getId()));
+
+ assignment.getAssignmentInformation().addAllocationDetails(
+ updatedContainer.getId(), getCSLeafQueue().getQueuePath());
+ assignment.getAssignmentInformation().incrAllocations();
+ Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+ allocatedResource);
}
- return new CSAssignment(Resources.none(), type);
}
+
+ return assignment;
}
-
- private boolean checkHeadroom(Resource clusterResource,
- ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) {
- // If headroom + currentReservation < required, we cannot allocate this
- // require
- Resource resourceCouldBeUnReserved = getCurrentReservation();
- if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
- // If we don't allow reservation continuous looking, OR we're looking at
- // non-default node partition, we won't allow to unreserve before
- // allocation.
- resourceCouldBeUnReserved = Resources.none();
- }
- return Resources
- .greaterThanOrEqual(rc, clusterResource, Resources.add(
- currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
- required);
- }
-
+
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
@@ -886,174 +507,45 @@ public CSAssignment assignContainers(Resource clusterResource,
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
- return SKIP_ASSIGNMENT;
+ return CSAssignment.SKIP_ASSIGNMENT;
}
synchronized (this) {
- // Check if this resource is on the blacklist
- if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) {
- return SKIP_ASSIGNMENT;
- }
-
// Schedule in priority order
for (Priority priority : getPriorities()) {
- ResourceRequest anyRequest =
- getResourceRequest(priority, ResourceRequest.ANY);
- if (null == anyRequest) {
- continue;
- }
-
- // Required resource
- Resource required = anyRequest.getCapability();
-
- // Do we need containers at this 'priority'?
- if (getTotalRequiredResources(priority) <= 0) {
- continue;
- }
-
- // AM container allocation doesn't support non-exclusive allocation to
- // avoid painful of preempt an AM container
- if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ ContainerAllocation allocationResult =
+ applicationContainerAllocator.allocate(clusterResource, node,
+ schedulingMode, currentResourceLimits, priority, null);
- RMAppAttempt rmAppAttempt =
- rmContext.getRMApps()
- .get(getApplicationId()).getCurrentAppAttempt();
- if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
- && null == rmAppAttempt.getMasterContainer()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip allocating AM container to app_attempt="
- + getApplicationAttemptId()
- + ", don't allow to allocate AM container in non-exclusive mode");
- }
- break;
- }
- }
+ // If it's a skipped allocation
+ AllocationState allocationState = allocationResult.getAllocationState();
- // Is the node-label-expression of this offswitch resource request
- // matches the node's label?
- // If not match, jump to next priority.
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
- anyRequest, node.getPartition(), schedulingMode)) {
+ if (allocationState == AllocationState.PRIORITY_SKIPPED) {
continue;
}
-
- if (!getCSLeafQueue().getReservationContinueLooking()) {
- if (!shouldAllocOrReserveNewContainer(priority, required)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("doesn't need containers based on reservation algo!");
- }
- continue;
- }
- }
-
- if (!checkHeadroom(clusterResource, currentResourceLimits, required,
- node)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("cannot allocate required resource=" + required
- + " because of headroom");
- }
- return NULL_ASSIGNMENT;
- }
-
- // Inform the application it is about to get a scheduling opportunity
- addSchedulingOpportunity(priority);
-
- // Increase missed-non-partitioned-resource-request-opportunity.
- // This is to make sure non-partitioned-resource-request will prefer
- // to be allocated to non-partitioned nodes
- int missedNonPartitionedRequestSchedulingOpportunity = 0;
- if (anyRequest.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL)) {
- missedNonPartitionedRequestSchedulingOpportunity =
- addMissedNonPartitionedRequestSchedulingOpportunity(priority);
- }
-
- if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
- // Before doing allocation, we need to check scheduling opportunity to
- // make sure : non-partitioned resource request should be scheduled to
- // non-partitioned partition first.
- if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
- .getScheduler().getNumClusterNodes()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip app_attempt="
- + getApplicationAttemptId() + " priority="
- + priority
- + " because missed-non-partitioned-resource-request"
- + " opportunity under requred:" + " Now="
- + missedNonPartitionedRequestSchedulingOpportunity
- + " required="
- + rmContext.getScheduler().getNumClusterNodes());
- }
-
- return SKIP_ASSIGNMENT;
- }
- }
-
- // Try to schedule
- CSAssignment assignment =
- assignContainersOnNode(clusterResource, node,
- priority, null, schedulingMode, currentResourceLimits);
-
- // Did the application skip this node?
- if (assignment.getSkipped()) {
- // Don't count 'skipped nodes' as a scheduling opportunity!
- subtractSchedulingOpportunity(priority);
- continue;
- }
-
- // Did we schedule or reserve a container?
- Resource assigned = assignment.getResource();
- if (Resources.greaterThan(rc, clusterResource,
- assigned, Resources.none())) {
- // Don't reset scheduling opportunities for offswitch assignments
- // otherwise the app will be delayed for each non-local assignment.
- // This helps apps with many off-cluster requests schedule faster.
- if (assignment.getType() != NodeType.OFF_SWITCH) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resetting scheduling opportunities");
- }
- resetSchedulingOpportunities(priority);
- }
- // Non-exclusive scheduling opportunity is different: we need reset
- // it every time to make sure non-labeled resource request will be
- // most likely allocated on non-labeled nodes first.
- resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
-
- // Done
- return assignment;
- } else {
- // Do not assign out of order w.r.t priorities
- return SKIP_ASSIGNMENT;
- }
+ return getCSAssignmentFromAllocateResult(clusterResource,
+ allocationResult);
}
}
- return SKIP_ASSIGNMENT;
+ // We will reach here if we skipped all priorities of the app, so we will
+ // skip the app.
+ return CSAssignment.SKIP_ASSIGNMENT;
}
public synchronized CSAssignment assignReservedContainer(
FiCaSchedulerNode node, RMContainer rmContainer,
Resource clusterResource, SchedulingMode schedulingMode) {
- // Do we still need this reservation?
- Priority priority = rmContainer.getReservedPriority();
- if (getTotalRequiredResources(priority) == 0) {
- // Release
- return new CSAssignment(this, rmContainer);
- }
+ ContainerAllocation result =
+ applicationContainerAllocator.allocate(clusterResource, node,
+ schedulingMode, new ResourceLimits(Resources.none()),
+ rmContainer.getReservedPriority(), rmContainer);
- // Try to assign if we have sufficient resources
- CSAssignment tmp =
- assignContainersOnNode(clusterResource, node, priority,
- rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
-
- // Doesn't matter... since it's already charged for at time of reservation
- // "re-reservation" is *free*
- CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
- if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
- ret.setFulfilledReservation(true);
- }
- return ret;
+ return getCSAssignmentFromAllocateResult(clusterResource, result);
}
+ public RMContext getRMContext() {
+ return this.rmContext;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index d225bd0..8954321 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1487,7 +1487,7 @@ public void testReservationExchange() throws Exception {
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
- CSAssignment assignment = a.assignContainers(clusterResource, node_0,
+ a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1496,7 +1496,20 @@ public void testReservationExchange() throws Exception {
assertEquals(0*GB, node_0.getUsedResource().getMemory());
}
-
+ private void verifyContainerAllocated(CSAssignment assignment, NodeType nodeType) {
+ Assert.assertTrue(Resources.greaterThan(resourceCalculator, null,
+ assignment.getResource(), Resources.none()));
+ Assert
+ .assertTrue(assignment.getAssignmentInformation().getNumAllocations() > 0);
+ Assert.assertEquals(nodeType, assignment.getType());
+ }
+
+ private void verifyNoContainerAllocated(CSAssignment assignment) {
+ Assert.assertTrue(Resources.equals(assignment.getResource(),
+ Resources.none()));
+ Assert
+ .assertTrue(assignment.getAssignmentInformation().getNumAllocations() == 0);
+ }
@Test
public void testLocalityScheduling() throws Exception {
@@ -1510,11 +1523,11 @@ public void testLocalityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- FiCaSchedulerApp app_0 =
- spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext));
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
-
+
// Setup some nodes and racks
String host_0 = "127.0.0.1";
String rack_0 = "rack_0";
@@ -1559,8 +1572,7 @@ public void testLocalityScheduling() throws Exception {
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1568,8 +1580,7 @@ public void testLocalityScheduling() throws Exception {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(2, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1577,8 +1588,7 @@ public void testLocalityScheduling() throws Exception {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(3, app_0.getSchedulingOpportunities(priority));
assertEquals(3, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1587,26 +1597,21 @@ public void testLocalityScheduling() throws Exception {
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
assertEquals(2, app_0.getTotalRequiredResources(priority));
- assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
- assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
@@ -1636,16 +1641,13 @@ public void testLocalityScheduling() throws Exception {
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
- assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(1, app_0.getTotalRequiredResources(priority));
- assertEquals(NodeType.RACK_LOCAL, assignment.getType());
}
@Test
@@ -1659,9 +1661,9 @@ public void testApplicationPriorityScheduling() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- FiCaSchedulerApp app_0 =
- spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext));
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks
@@ -1721,63 +1723,48 @@ public void testApplicationPriorityScheduling() throws Exception {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
- a.assignContainers(clusterResource, node_2,
+ CSAssignment assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- eq(priority_1), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
- a.assignContainers(clusterResource, node_2,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- eq(priority_1), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
assertEquals(2, app_0.getTotalRequiredResources(priority_1));
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
- a.assignContainers(clusterResource, node_2,
+ assignment = a.assignContainers(clusterResource, node_2,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
- eq(priority_1), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
assertEquals(1, app_0.getTotalRequiredResources(priority_1));
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
- eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
- a.assignContainers(clusterResource, node_0,
+ assignment = a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
- eq(priority_1), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_0),
- eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
- a.assignContainers(clusterResource, node_1,
+ assignment = a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
- eq(priority_1), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
assertEquals(0, app_0.getTotalRequiredResources(priority_1));
- verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1),
- eq(priority_2), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_2));
assertEquals(0, app_0.getTotalRequiredResources(priority_2));
@@ -1796,8 +1783,8 @@ public void testSchedulingConstraints() throws Exception {
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
- spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext));
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
// Setup some nodes and racks
@@ -1847,19 +1834,17 @@ public void testSchedulingConstraints() throws Exception {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
- a.assignContainers(clusterResource, node_0_0,
+ CSAssignment assignment = a.assignContainers(clusterResource, node_0_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
- a.assignContainers(clusterResource, node_1_0,
+ assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
// since #req=0
assertEquals(0, app_0.getTotalRequiredResources(priority));
@@ -1873,21 +1858,18 @@ public void testSchedulingConstraints() throws Exception {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
- a.assignContainers(clusterResource, node_0_1,
+ assignment = a.assignContainers(clusterResource, node_0_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
- a.assignContainers(clusterResource, node_1_0,
+ assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
assertEquals(0, app_0.getTotalRequiredResources(priority));
-
}
@Test (timeout = 30000)
@@ -2065,16 +2047,16 @@ public void testLocalityConstraints() throws Exception {
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- FiCaSchedulerApp app_0 =
- spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext));
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- FiCaSchedulerApp app_1 =
- spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext));
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes and racks
@@ -2134,10 +2116,10 @@ public void testLocalityConstraints() throws Exception {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
- a.assignContainers(clusterResource, node_0_1,
- new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ CSAssignment assignment =
+ a.assignContainers(clusterResource, node_0_1, new ResourceLimits(
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// resourceName:
@@ -2157,10 +2139,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
- a.assignContainers(clusterResource, node_1_1,
+ assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Allow rack-locality for rack_1, but blacklist node_1_1
@@ -2188,10 +2169,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
- a.assignContainers(clusterResource, node_1_1,
+ assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
@@ -2217,10 +2197,9 @@ public void testLocalityConstraints() throws Exception {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
- a.assignContainers(clusterResource, node_1_1,
+ assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now remove rack_1 from blacklist
@@ -2244,10 +2223,9 @@ public void testLocalityConstraints() throws Exception {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
- a.assignContainers(clusterResource, node_1_1,
+ assignment = a.assignContainers(clusterResource, node_1_1,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyNoContainerAllocated(assignment);
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
@@ -2275,10 +2253,9 @@ public void testLocalityConstraints() throws Exception {
// host_1_0: 8G
// host_1_1: 7G
- a.assignContainers(clusterResource, node_1_0,
+ assignment = a.assignContainers(clusterResource, node_1_0,
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
- any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
assertEquals(0, app_0.getSchedulingOpportunities(priority));
assertEquals(0, app_0.getTotalRequiredResources(priority));