ps) {
+ N node = null;
+ if (1 == ps.getAllNodes().size()) {
+ node = ps.getAllNodes().values().iterator().next();
+ }
+
+ return node;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
new file mode 100644
index 0000000..da356f5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Result of ResourceRequest update
+ */
+public class ResourceRequestUpdateResult {
+ private final ResourceRequest lastAnyResourceRequest;
+ private final ResourceRequest newResourceRequest;
+
+ public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest,
+ ResourceRequest newResourceRequest) {
+ this.lastAnyResourceRequest = lastAnyResourceRequest;
+ this.newResourceRequest = newResourceRequest;
+ }
+
+ public ResourceRequest getLastAnyResourceRequest() {
+ return lastAnyResourceRequest;
+ }
+
+ public ResourceRequest getNewResourceRequest() {
+ return newResourceRequest;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
new file mode 100644
index 0000000..02ac457
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ * In addition to {@link PlacementSet}, this also maintains
+ * pending ResourceRequests:
+ * - When new ResourceRequest(s) added to scheduler, or,
+ * - Or new container allocated, scheduler can notify corresponding
+ * PlacementSet.
+ *
+ *
+ *
+ * Different set of resource requests (E.g., resource requests with the
+ * same schedulerKey) can have one instance of PlacementSet, each PlacementSet
+ * can have different ways to order nodes depends on requests.
+ *
+ */
+public interface SchedulingPlacementSet
+ extends PlacementSet {
+ /**
+ * Get iterator of preferred node depends on requirement and/or availability
+ * @param clusterPlacementSet input cluster PlacementSet
+ * @return iterator of preferred node
+ */
+ Iterator getPreferredNodeIterator(PlacementSet clusterPlacementSet);
+
+ /**
+ * Replace existing ResourceRequest by the new requests
+ *
+ * @param requests new ResourceRequests
+ * @param recoverPreemptedRequestForAContainer if we're recovering resource
+ * requests for preempted container
+ * @return true if total pending resource changed
+ */
+ ResourceRequestUpdateResult updateResourceRequests(
+ Collection requests,
+ boolean recoverPreemptedRequestForAContainer);
+
+ /**
+ * Get pending ResourceRequests by given schedulerRequestKey
+ * @return Map of resourceName to ResourceRequest
+ */
+ Map getResourceRequests();
+
+ /**
+ * Get ResourceRequest by given schedulerKey and resourceName
+ * @param resourceName resourceName
+ * @return ResourceRequest
+ */
+ ResourceRequest getResourceRequest(String resourceName);
+
+ /**
+ * Notify container allocated.
+ * @param type Type of the allocation
+ * @param node Which node this container allocated on
+ * @param request Which resource request to allocate
+ * @return list of ResourceRequests deducted
+ */
+ List allocate(NodeType type, SchedulerNode node,
+ ResourceRequest request);
+
+ /**
+ * We can still have pending requirement for a given NodeType and node
+ * @param type Locality Type
+ * @param node which node we will allocate on
+ * @return true if we has pending requirement
+ */
+ boolean canAllocate(NodeType type, SchedulerNode node);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
new file mode 100644
index 0000000..48efaa1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple PlacementSet which keeps an unordered map
+ */
+public class SimplePlacementSet
+ implements PlacementSet {
+
+ private Map map;
+ private String partition;
+
+ public SimplePlacementSet(N node) {
+ if (null != node) {
+ // Only one node in the initial PlacementSet
+ this.map = ImmutableMap.of(node.getNodeID(), node);
+ this.partition = node.getPartition();
+ } else {
+ this.map = Collections.emptyMap();
+ this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION;
+ }
+ }
+
+ public SimplePlacementSet(Map map, String partition) {
+ this.map = map;
+ this.partition = partition;
+ }
+
+ @Override
+ public Map getAllNodes() {
+ return map;
+ }
+
+ @Override
+ public long getVersion() {
+ return 0L;
+ }
+
+ @Override
+ public String getPartition() {
+ return partition;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 7bec03a..b7cb1bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -35,7 +37,7 @@
private static final Log LOG = LogFactory.getLog(OrderingPolicy.class);
- protected TreeSet schedulableEntities;
+ protected ConcurrentSkipListSet schedulableEntities;
protected Comparator comparator;
protected Map entitiesToReorder = new HashMap();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
index 3cfcd7a..3371df8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
import com.google.common.annotations.VisibleForTesting;
@@ -61,7 +62,7 @@ public FairOrderingPolicy() {
comparators
);
this.comparator = fairComparator;
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
private double getMagnitude(SchedulableEntity r) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
index 10f8eeb..2d066bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
@@ -32,7 +34,7 @@ public FifoOrderingPolicy() {
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
index 0891289..6ced9e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java
@@ -21,6 +21,7 @@
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import java.util.concurrent.ConcurrentSkipListSet;
/**
* This ordering policy is used for pending applications only.
@@ -46,7 +47,7 @@ public FifoOrderingPolicyForPendingApps() {
comparators.add(new PriorityComparator());
comparators.add(new FifoComparator());
this.comparator = new CompoundComparator(comparators);
- this.schedulableEntities = new TreeSet(comparator);
+ this.schedulableEntities = new ConcurrentSkipListSet(comparator);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 25a8288..3d07b81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -23,6 +23,7 @@
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -167,6 +168,28 @@ public void drainEvents() {
}
}
+ private void waitForState(ApplicationId appId, EnumSet finalStates)
+ throws InterruptedException {
+ RMApp app = getRMContext().getRMApps().get(appId);
+ Assert.assertNotNull("app shouldn't be null", app);
+ final int timeoutMsecs = 80 * SECOND;
+ int timeWaiting = 0;
+ while (!finalStates.contains(app.getState())) {
+ if (timeWaiting >= timeoutMsecs) {
+ break;
+ }
+
+ LOG.info("App : " + appId + " State is : " + app.getState() +
+ " Waiting for state : " + finalStates);
+ Thread.sleep(WAIT_MS_PER_LOOP);
+ timeWaiting += WAIT_MS_PER_LOOP;
+ }
+
+ LOG.info("App State is : " + app.getState());
+ Assert.assertTrue("App State is not correct (timeout).",
+ finalStates.contains(app.getState()));
+ }
+
/**
* Wait until an application has reached a specified state.
* The timeout is 80 seconds.
@@ -254,7 +277,7 @@ public static void waitForState(RMAppAttempt attempt,
RMAppAttemptState finalState, int timeoutMsecs)
throws InterruptedException {
int timeWaiting = 0;
- while (!finalState.equals(attempt.getAppAttemptState())) {
+ while (finalState != attempt.getAppAttemptState()) {
if (timeWaiting >= timeoutMsecs) {
break;
}
@@ -267,7 +290,7 @@ public static void waitForState(RMAppAttempt attempt,
LOG.info("Attempt State is : " + attempt.getAppAttemptState());
Assert.assertEquals("Attempt state is not correct (timeout).", finalState,
- attempt.getState());
+ attempt.getState());
}
public void waitForContainerToComplete(RMAppAttempt attempt,
@@ -959,6 +982,26 @@ private static void waitForSchedulerAppAttemptAdded(
rm.getResourceScheduler()).getApplicationAttempt(attemptId));
}
+ public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm)
+ throws Exception {
+ int i = 0;
+ while (app.getCurrentAppAttempt() == null) {
+ if (i < 100) {
+ i++;
+ }
+ Thread.sleep(50);
+ }
+
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+ rm.waitForState(attempt.getAppAttemptId(),
+ RMAppAttemptState.ALLOCATED);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+
+ return am;
+ }
+
/**
* NOTE: nm.nodeHeartbeat is explicitly invoked,
* don't invoke it before calling launchAM
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index cee9086..ec11c3ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -1091,7 +1091,7 @@ public ApplicationReport createAndGetApplicationReport(
rmContext.getScheduler().getSchedulerAppInfo(attemptId)
.getLiveContainers()).thenReturn(rmContainers);
ContainerStatus cs = mock(ContainerStatus.class);
- when(containerimpl.getFinishedStatus()).thenReturn(cs);
+ when(containerimpl.completed()).thenReturn(false);
when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A");
when(containerimpl.getContainerExitStatus()).thenReturn(0);
when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
index 56d38fb..83a354d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java
@@ -238,8 +238,10 @@ public void testCapacitySchedulerAllocation() throws Exception {
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
- Assert.assertEquals(2, sh.getAllocationCount().longValue());
- Assert.assertEquals(Resource.newInstance(3 * 1024, 2),
+ // Now SchedulerHealth records last container allocated, aggregated
+ // allocation account will not be changed
+ Assert.assertEquals(1, sh.getAllocationCount().longValue());
+ Assert.assertEquals(Resource.newInstance(1 * 1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index d335552..f5fc24e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -686,6 +686,9 @@ public void testHeadroom() throws Exception {
List app_0_1_requests = new ArrayList();
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory));
+ app_0_1.updateResourceRequests(app_0_1_requests);
+
+ app_0_1_requests.clear();
app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y"));
app_0_1.updateResourceRequests(app_0_1_requests);
@@ -714,6 +717,9 @@ public void testHeadroom() throws Exception {
List app_1_0_requests = new ArrayList();
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory));
+ app_1_0.updateResourceRequests(app_1_0_requests);
+
+ app_1_0_requests.clear();
app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 2, true, priority_1, recordFactory, "y"));
app_1_0.updateResourceRequests(app_1_0_requests);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 6bcf949..0768d39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -134,6 +134,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -3450,7 +3451,7 @@ public void testSchedulingOnRemovedNode() throws Exception {
scheduler.handle(new NodeRemovedSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
// schedulerNode is removed, try allocate a container
- scheduler.allocateContainersToNode(node);
+ scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
@@ -3696,4 +3697,57 @@ private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
cs.handle(addAttemptEvent1);
return appAttemptId1;
}
+
+ @Test
+ public void testAppAttemptLocalityStatistics() throws Exception {
+ Configuration conf =
+ TestUtils.getConfigurationWithMultipleQueues(new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ MockRM rm = new MockRM(conf) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+ MockNM nm1 =
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+
+ // Got one offswitch request and offswitch allocation
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // am1 asks for 1 GB resource on h1/default-rack/offswitch
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), "*",
+ Resources.createResource(1 * GB), 2), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 2), ResourceRequest
+ .newInstance(Priority.newInstance(1), "h1",
+ Resources.createResource(1 * GB), 1)), null);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+
+ // Got one nodelocal request and nodelocal allocation
+ cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+ // Got one nodelocal request and racklocal allocation
+ cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));
+
+ RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get(
+ app1.getApplicationId()).getCurrentAppAttempt()
+ .getRMAppAttemptMetrics();
+
+ // We should get one node-local allocation, one rack-local allocation
+ // And one off-switch allocation
+ Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } },
+ attemptMetrics.getLocalityStatistics());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
new file mode 100644
index 0000000..9854a15
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestCapacitySchedulerAsyncScheduling {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @Test(timeout = 300000)
+ public void testSingleThreadAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(1);
+ }
+
+ @Test(timeout = 300000)
+ public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(2);
+ }
+
+ @Test(timeout = 300000)
+ public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+ testAsyncContainerAllocation(3);
+ }
+
+ public void testAsyncContainerAllocation(int numThreads) throws Exception {
+ conf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ numThreads);
+ conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 100);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ // inject node label manager
+ MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+
+ List