diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1230cb5c9d8..a0f42fb7ca0 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -115,7 +115,7 @@
-Xmx2048m -XX:+HeapDumpOnOutOfMemoryError
- 2.21.0
+ 2.22.1
${maven-surefire-plugin.version}
${maven-surefire-plugin.version}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 2322b468251..e0d1401c138 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2627,7 +2627,11 @@ public ResourceUsage getClusterResourceUsage() {
.getContainersToKill().isEmpty()) {
list = new ArrayList<>();
for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
- list.add(getSchedulerContainer(rmContainer, false));
+ SchedulerContainer schedulerContainer =
+ getSchedulerContainer(rmContainer, false);
+ if (schedulerContainer != null) {
+ list.add(schedulerContainer);
+ }
}
}
@@ -2635,10 +2639,16 @@ public ResourceUsage getClusterResourceUsage() {
if (null == list) {
list = new ArrayList<>();
}
- list.add(
- getSchedulerContainer(csAssignment.getExcessReservation(), false));
+ SchedulerContainer schedulerContainer =
+ getSchedulerContainer(csAssignment.getExcessReservation(), false);
+ if (schedulerContainer != null) {
+ list.add(schedulerContainer);
+ }
}
+ if (list != null && list.isEmpty()) {
+ list = null;
+ }
return list;
}
@@ -2723,11 +2733,15 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
((RMContainerImpl)rmContainer).setAllocationTags(
new HashSet<>(schedulingRequest.getAllocationTags()));
- allocated = new ContainerAllocationProposal<>(
- getSchedulerContainer(rmContainer, true),
- null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
- resource);
+ SchedulerContainer
+ schedulerContainer = getSchedulerContainer(rmContainer, true);
+ if (schedulerContainer == null) {
+ allocated = null;
+ } else {
+ allocated = new ContainerAllocationProposal<>(schedulerContainer,
+ null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resource);
+ }
}
if (null != allocated) {
@@ -2757,16 +2771,27 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
csAssignment.getAssignmentInformation().getAllocationDetails();
if (!allocations.isEmpty()) {
RMContainer rmContainer = allocations.get(0).rmContainer;
- allocated = new ContainerAllocationProposal<>(
- getSchedulerContainer(rmContainer, true),
- getSchedulerContainersToRelease(csAssignment),
- getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.getType(),
- csAssignment.getRequestLocalityType(),
- csAssignment.getSchedulingMode() != null ?
- csAssignment.getSchedulingMode() :
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
- csAssignment.getResource());
+ SchedulerContainer
+ schedulerContainer = getSchedulerContainer(rmContainer, true);
+ if (schedulerContainer == null) {
+ allocated = null;
+ // Decrease unconfirmed resource if app is alive
+ FiCaSchedulerApp app = getApplicationAttempt(
+ rmContainer.getApplicationAttemptId());
+ if (app != null) {
+ app.decUnconfirmedRes(rmContainer.getAllocatedResource());
+ }
+ } else {
+ allocated = new ContainerAllocationProposal<>(schedulerContainer,
+ getSchedulerContainersToRelease(csAssignment),
+ getSchedulerContainer(
+ csAssignment.getFulfilledReservedContainer(), false),
+ csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ csAssignment.getSchedulingMode() != null ?
+ csAssignment.getSchedulingMode() :
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+ csAssignment.getResource());
+ }
}
// Reserved something
@@ -2774,16 +2799,21 @@ public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
csAssignment.getAssignmentInformation().getReservationDetails();
if (!reservation.isEmpty()) {
RMContainer rmContainer = reservation.get(0).rmContainer;
- reserved = new ContainerAllocationProposal<>(
- getSchedulerContainer(rmContainer, false),
- getSchedulerContainersToRelease(csAssignment),
- getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.getType(),
- csAssignment.getRequestLocalityType(),
- csAssignment.getSchedulingMode() != null ?
- csAssignment.getSchedulingMode() :
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
- csAssignment.getResource());
+ SchedulerContainer
+ schedulerContainer = getSchedulerContainer(rmContainer, false);
+ if (schedulerContainer == null) {
+ reserved = null;
+ } else {
+ reserved = new ContainerAllocationProposal<>(schedulerContainer,
+ getSchedulerContainersToRelease(csAssignment),
+ getSchedulerContainer(
+ csAssignment.getFulfilledReservedContainer(), false),
+ csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ csAssignment.getSchedulingMode() != null ?
+ csAssignment.getSchedulingMode() :
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+ csAssignment.getResource());
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 840d30dc1d1..6fb89ccc2ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -56,8 +56,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
@@ -843,6 +846,92 @@ private ResourceCommitRequest createAllocateFromReservedProposal(
return new ResourceCommitRequest(allocateProposals, null, null);
}
+ @Test(timeout = 30000)
+ public void testReturnNullWhenGetSchedulerContainer() throws Exception {
+ // disable async-scheduling for simulating complex scenario
+ Configuration disableAsyncConf = new Configuration(conf);
+ disableAsyncConf.setBoolean(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+ // init RM & NMs
+ final MockRM rm = new MockRM(disableAsyncConf);
+ rm.start();
+ final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+ final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
+ rm.drainEvents();
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+ SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+ RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
+ SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+
+ // launch app1-am on nm1
+ RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // app2 asks 1 * 1G container
+ am1.allocate(ImmutableList.of(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*",
+ Resources.createResource(1 * GB), 1)), null);
+ RMContainer amContainer = cs.getRMContainer(
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
+
+ // spy CapacityScheduler
+ final CapacityScheduler spyCs = Mockito.spy(cs);
+ // hook CapacityScheduler#submitResourceCommitRequest
+ List assignmentSnapshots = new ArrayList<>();
+ Mockito.doAnswer(new Answer